Spaces:
Sleeping
Sleeping
| """ | |
| DeepMosaics - Add/remove mosaics from images/videos using AI. | |
| https://github.com/HypoX64/DeepMosaics | |
| """ | |
| import os | |
| import numpy as np | |
| import cv2 | |
| import onnxruntime as ort | |
| ONNX_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "onnx_models") | |
| VIDEO_EXTS = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.gif'] | |
| sessions = {} | |
| def get_session(name): | |
| if name not in sessions: | |
| path = os.path.join(ONNX_DIR, f"{name}.onnx") | |
| if not os.path.exists(path): | |
| raise FileNotFoundError(f"Model not found: {path}") | |
| sessions[name] = ort.InferenceSession(path, providers=['CPUExecutionProvider']) | |
| return sessions[name] | |
| # ============ Segmentation ============ | |
| def run_segment(img, model, size=360): | |
| sess = get_session(model) | |
| resized = cv2.resize(img, (size, size)).astype(np.float32) / 255.0 | |
| tensor = np.transpose(resized, (2, 0, 1))[np.newaxis] | |
| out = sess.run(None, {'input': tensor})[0].squeeze() | |
| return (out * 255).clip(0, 255).astype(np.uint8) | |
| def get_all_regions(img, model, threshold=127, ex_mul=1.5, all_areas=False): | |
| """Get detected mosaic regions with repo-style detection. Returns (regions, mask)""" | |
| h, w = img.shape[:2] | |
| mask_raw = run_segment(img, model) | |
| # Repo-style mask processing | |
| ex_mun = max(1, int(min(h, w) / 20)) | |
| mask = cv2.threshold(mask_raw, threshold, 255, cv2.THRESH_BINARY)[1] | |
| mask = cv2.blur(mask, (ex_mun, ex_mun)) | |
| mask = cv2.threshold(mask, int(threshold / 5), 255, cv2.THRESH_BINARY)[1] | |
| # Find most likely ROI (largest contour) - like repo's find_mostlikely_ROI | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if not all_areas and contours: | |
| # Keep only largest contour | |
| areas = [cv2.contourArea(c) for c in contours] | |
| if areas: | |
| largest_idx = areas.index(max(areas)) | |
| mask = np.zeros_like(mask) | |
| cv2.fillPoly(mask, [contours[largest_idx]], 255) | |
| contours = [contours[largest_idx]] | |
| regions = [] | |
| rat = min(h, w) / 360.0 | |
| for c in contours: | |
| if cv2.contourArea(c) < 50: | |
| continue | |
| x, y, bw, bh = cv2.boundingRect(c) | |
| cx, cy = x + bw // 2, y + bh // 2 | |
| size_orig = max(bw, bh) | |
| # Scale to original and apply Ex_mul expansion | |
| cx = int(cx * rat) | |
| cy = int(cy * rat) | |
| halfsize = int(size_orig * rat * ex_mul / 2) | |
| # Clamp to image bounds | |
| halfsize = max(15, min(halfsize, min(h, w) // 2 - 1)) | |
| cx = max(halfsize, min(cx, w - halfsize)) | |
| cy = max(halfsize, min(cy, h - halfsize)) | |
| regions.append((cx, cy, halfsize)) | |
| return regions, mask | |
| def get_region(img, model): | |
| # add_youknow has weaker detection, use lower threshold | |
| threshold = 20 if model == "add_youknow" else 127 | |
| regions, _ = get_all_regions(img, model, threshold=threshold) | |
| return max(regions, key=lambda r: r[2]) if regions else (0, 0, 0) | |
| # ============ Cleaning ============ | |
| def run_clean(crop, model, size): | |
| sess = get_session(model) | |
| img = cv2.resize(crop, (size, size)) | |
| img = img[:, :, ::-1] # BGR to RGB (model expects RGB) | |
| img = img.astype(np.float32) / 255.0 * 2 - 1 | |
| img = np.transpose(img, (2, 0, 1))[np.newaxis] | |
| out = sess.run(None, {'input': img})[0].squeeze() | |
| out = np.transpose(out, (1, 2, 0)) | |
| out = ((out + 1) / 2 * 255).clip(0, 255).astype(np.uint8) | |
| return out[:, :, ::-1] # RGB to BGR | |
| def run_clean_video(crops, prev_frame): | |
| """Run video model (5-frame input for temporal consistency)""" | |
| sess = get_session("clean_youknow_video") | |
| size = 256 | |
| frames = [] | |
| for crop in crops: | |
| img = cv2.resize(crop, (size, size))[:, :, ::-1] # BGR to RGB | |
| img = img.astype(np.float32) / 255.0 * 2 - 1 | |
| frames.append(np.transpose(img, (2, 0, 1))) | |
| stream = np.stack(frames, axis=1)[np.newaxis] # [1, 3, 5, 256, 256] | |
| if prev_frame is None: | |
| prev = np.zeros((1, 3, size, size), dtype=np.float32) | |
| else: | |
| p = cv2.resize(prev_frame, (size, size))[:, :, ::-1] | |
| p = p.astype(np.float32) / 255.0 * 2 - 1 | |
| prev = np.transpose(p, (2, 0, 1))[np.newaxis] | |
| out = sess.run(None, {'input': stream, 'prev_frame': prev})[0].squeeze() | |
| out = np.transpose(out, (1, 2, 0)) | |
| out = ((out + 1) / 2 * 255).clip(0, 255).astype(np.uint8) | |
| return out[:, :, ::-1] # RGB to BGR | |
| def blend(img, fake, x, y, size, seg_mask=None): | |
| """Blend fake into img using segmentation mask (repo-style)""" | |
| h, w = img.shape[:2] | |
| fake = cv2.resize(fake, (size * 2, size * 2), interpolation=cv2.INTER_CUBIC) | |
| y1, y2, x1, x2 = y - size, y + size, x - size, x + size | |
| if y1 < 0 or x1 < 0 or y2 > h or x2 > w: | |
| return img | |
| # Use segmentation mask if provided, else use box mask | |
| if seg_mask is not None: | |
| # Resize mask to original image size and crop | |
| mask_full = cv2.resize(seg_mask, (w, h)) | |
| mask_crop = mask_full[y1:y2, x1:x2] | |
| else: | |
| mask_crop = np.ones((size*2, size*2), dtype=np.uint8) * 255 | |
| # Feathering (eclosion like repo) | |
| eclosion_num = int(size / 10) + 2 | |
| mask_crop = cv2.blur(mask_crop, (eclosion_num, eclosion_num)) | |
| mask_crop = mask_crop.astype(np.float32) / 255.0 | |
| mask_crop = np.stack([mask_crop]*3, axis=-1) | |
| crop = img[y1:y2, x1:x2].astype(np.float32) | |
| img[y1:y2, x1:x2] = np.clip(crop * (1 - mask_crop) + fake.astype(np.float32) * mask_crop, 0, 255).astype(np.uint8) | |
| return img | |
| def addmosaic_base(img, mask, n, model='squa_avg', feather=0): | |
| """Repo-style mosaic adding (squa_avg with feather)""" | |
| n = int(max(1, n)) | |
| h, w = img.shape[:2] | |
| if mask.shape[0] != h: | |
| mask = cv2.resize(mask, (w, h)) | |
| img_mosaic = img.copy() | |
| h_step = h // n | |
| w_step = w // n | |
| pix_mid_h = n // 2 | |
| pix_mid_w = n // 2 | |
| # squa_avg: fill each block with average color | |
| for i in range(h_step): | |
| for j in range(w_step): | |
| if mask[min(i*n + pix_mid_h, h-1), min(j*n + pix_mid_w, w-1)] > 0: | |
| block = img[i*n:(i+1)*n, j*n:(j+1)*n, :] | |
| if block.size > 0: | |
| img_mosaic[i*n:(i+1)*n, j*n:(j+1)*n, :] = block.mean(axis=(0,1)) | |
| # Feathering for smooth edges | |
| if feather >= 0: | |
| blur_size = n if feather == 0 else feather | |
| mask_blur = cv2.blur(mask.astype(np.float32), (blur_size, blur_size)) / 255.0 | |
| for i in range(3): | |
| img_mosaic[:,:,i] = (img[:,:,i] * (1 - mask_blur) + img_mosaic[:,:,i] * mask_blur) | |
| img_mosaic = img_mosaic.astype(np.uint8) | |
| return img_mosaic | |
| def get_mosaic_autosize(img, mask): | |
| """Calculate mosaic size based on mask area (repo-style)""" | |
| h, w = img.shape[:2] | |
| size = min(h, w) | |
| mask_resized = cv2.resize(mask, (size, size)) | |
| alpha = size / 512 | |
| # Calculate mask area | |
| area = np.sum(mask_resized > 127) | |
| area = area / (alpha * alpha) | |
| if area > 50000: | |
| mosaic_size = alpha * ((area - 50000) / 50000 + 12) | |
| elif 20000 < area <= 50000: | |
| mosaic_size = alpha * ((area - 20000) / 30000 + 8) | |
| elif 5000 < area <= 20000: | |
| mosaic_size = alpha * ((area - 5000) / 20000 + 7) | |
| elif 0 <= area <= 5000: | |
| mosaic_size = alpha * (area / 5000 + 6) | |
| else: | |
| mosaic_size = 7 | |
| return max(3, mosaic_size) | |
| def add_mosaic_mask(img, model, threshold=20): | |
| """Add mosaic using mask (repo-style for body/general mode)""" | |
| h, w = img.shape[:2] | |
| mask = run_segment(img, model) | |
| mask = cv2.threshold(mask, threshold, 255, cv2.THRESH_BINARY)[1] | |
| mask = cv2.resize(mask, (w, h)) | |
| mosaic_size = get_mosaic_autosize(img, mask) | |
| return addmosaic_base(img, mask, mosaic_size, model='squa_avg', feather=0) | |
| def pixelate(img, x, y, size, block=7): | |
| y1, y2, x1, x2 = y - size, y + size, x - size, x + size | |
| if y1 < 0 or x1 < 0 or y2 > img.shape[0] or x2 > img.shape[1]: | |
| return img | |
| region = img[y1:y2, x1:x2] | |
| rh, rw = region.shape[:2] | |
| if rh <= 0 or rw <= 0: | |
| return img | |
| small = cv2.resize(region, (max(1, rw//block), max(1, rh//block)), interpolation=cv2.INTER_LINEAR) | |
| img[y1:y2, x1:x2] = cv2.resize(small, (rw, rh), interpolation=cv2.INTER_NEAREST) | |
| return img | |
| # ============ Processing ============ | |
| def process_image(img_bgr, action, mode="face"): | |
| result = img_bgr.copy() | |
| if action == "add": | |
| if mode == "face": | |
| x, y, size = get_region(img_bgr, "add_face") | |
| if size >= 10: | |
| result = pixelate(result, x, y, size) | |
| else: | |
| # Body mode: use mask-based mosaic (like repo) | |
| result = add_mosaic_mask(img_bgr, "add_youknow") | |
| else: | |
| # Face mode uses larger expansion for better coverage | |
| ex_mul = 2.0 if mode == "face" else 1.5 | |
| regions, seg_mask = get_all_regions(img_bgr, "mosaic_position", ex_mul=ex_mul) | |
| for x, y, size in regions: | |
| if size < 10: | |
| continue | |
| crop = result[y-size:y+size, x-size:x+size] | |
| if crop.size == 0: | |
| continue | |
| if mode == "face": | |
| fake = run_clean(crop, "clean_face_HD", 512) | |
| else: | |
| # Use video model for body/general (better quality than img model) | |
| crops = [crop] * 5 | |
| fake = run_clean_video(crops, None) | |
| result = blend(result, fake, x, y, size, seg_mask) | |
| return result | |
| def process_video(video_path, action, mode="face"): | |
| import tempfile | |
| if not video_path: | |
| return None | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30 | |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| out_path = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False).name | |
| out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) | |
| # For body/general video removal, use video model with 5-frame input | |
| if action == "remove" and mode == "body": | |
| frames, regions = [], [] | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frames.append(frame) | |
| regs, _ = get_all_regions(frame, "mosaic_position") | |
| regions.append(regs) | |
| prev_output = None | |
| for i, frame in enumerate(frames): | |
| result = frame.copy() | |
| for x, y, size in regions[i]: | |
| if size < 10: | |
| continue | |
| # Get 5 crops centered on frame i | |
| crops = [] | |
| for j in range(i-2, i+3): | |
| idx = max(0, min(j, len(frames)-1)) | |
| rx, ry, rs = (regions[idx][0] if regions[idx] else (x, y, size)) | |
| crop = frames[idx][ry-rs:ry+rs, rx-rs:rx+rs] | |
| if crop.size == 0: | |
| crop = np.zeros((size*2, size*2, 3), dtype=np.uint8) | |
| crops.append(crop) | |
| fake = run_clean_video(crops, prev_output) | |
| prev_output = fake | |
| result = blend(result, fake, x, y, size) | |
| out.write(result) | |
| else: | |
| # Frame-by-frame for face or add | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| out.write(process_image(frame, action, mode)) | |
| cap.release() | |
| out.release() | |
| return out_path | |
| # ============ Gradio ============ | |
| def is_video(file_path): | |
| if not file_path: | |
| return False | |
| ext = os.path.splitext(str(file_path))[1].lower() | |
| return ext in VIDEO_EXTS | |
| def to_bgr(pil_img): | |
| """Convert PIL image to BGR, handling grayscale""" | |
| img = np.array(pil_img) | |
| if img.ndim == 2: # Grayscale | |
| img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) | |
| elif img.shape[2] == 4: # RGBA | |
| img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) | |
| else: # RGB | |
| img = img[:, :, ::-1] | |
| return img | |
| def add_mosaic_img(file, target): | |
| if file is None: | |
| return None | |
| img = to_bgr(file) | |
| return process_image(img, "add", "face")[:, :, ::-1] | |
| def remove_mosaic_img(file, target): | |
| if file is None: | |
| return None | |
| mode = "body" if "Body" in target or "General" in target else "face" | |
| img = to_bgr(file) | |
| return process_image(img, "remove", mode)[:, :, ::-1] | |
| def add_mosaic_vid(file, target): | |
| if file is None: | |
| return None | |
| return process_video(file, "add", "face") | |
| def remove_mosaic_vid(file, target): | |
| if file is None: | |
| return None | |
| mode = "body" if "Body" in target or "General" in target else "face" | |
| return process_video(file, "remove", mode) | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) >= 4: | |
| from PIL import Image | |
| import shutil | |
| action, inp, out = sys.argv[1], sys.argv[2], sys.argv[3] | |
| mode = sys.argv[4] if len(sys.argv) > 4 else "face" | |
| ext = os.path.splitext(inp)[1].lower() | |
| if ext in VIDEO_EXTS: | |
| result_path = process_video(inp, action, mode) | |
| if result_path: | |
| shutil.move(result_path, out) | |
| print(f"Saved: {out}") | |
| else: | |
| img = Image.open(inp) | |
| img_bgr = np.array(img)[:, :, :3][:, :, ::-1] | |
| result = process_image(img_bgr, action, mode) | |
| Image.fromarray(result[:, :, ::-1]).save(out) | |
| print(f"Saved: {out}") | |
| elif len(sys.argv) == 1: | |
| import gradio as gr | |
| from PIL import Image as PILImage | |
| def remove_mosaic_for_example(input_img, target): | |
| """Process for examples - returns output image""" | |
| if input_img is None: | |
| return None | |
| mode = "body" if "Body" in target or "General" in target else "face" | |
| img = to_bgr(input_img) | |
| result = process_image(img, "remove", mode) | |
| return PILImage.fromarray(result[:, :, ::-1]) | |
| def add_mosaic_for_example(input_img, target): | |
| """Process for examples - returns output image""" | |
| if input_img is None: | |
| return None | |
| img = to_bgr(input_img) | |
| result = process_image(img, "add", "face") | |
| return PILImage.fromarray(result[:, :, ::-1]) | |
| css = ".compact { max-width: 900px; margin: auto; }" | |
| def process_any(file, target, action): | |
| """Process image or video - auto-detect by extension""" | |
| if file is None: | |
| return gr.update(visible=True, value=None), gr.update(visible=False, value=None) | |
| path = file if isinstance(file, str) else file | |
| ext = os.path.splitext(path)[1].lower() | |
| mode = "body" if "Body" in target or "General" in target else "face" | |
| if ext in VIDEO_EXTS: | |
| # Video/GIF - show video output, hide image | |
| result = process_video(path, action, mode) | |
| return gr.update(visible=False, value=None), gr.update(visible=True, value=result) | |
| else: | |
| # Image - show image output, hide video | |
| img = to_bgr(PILImage.open(path)) | |
| result = process_image(img, action, mode) | |
| return gr.update(visible=True, value=PILImage.fromarray(result[:, :, ::-1])), gr.update(visible=False, value=None) | |
| def update_preview(file): | |
| """Update preview based on file type""" | |
| if file is None: | |
| return gr.update(visible=True, value=None), gr.update(visible=False, value=None) | |
| path = file if isinstance(file, str) else file | |
| ext = os.path.splitext(path)[1].lower() | |
| if ext in VIDEO_EXTS: | |
| return gr.update(visible=False, value=None), gr.update(visible=True, value=path) | |
| else: | |
| return gr.update(visible=True, value=path), gr.update(visible=False, value=None) | |
| with gr.Blocks(title="DeepMosaics") as demo: | |
| with gr.Column(elem_classes="compact"): | |
| gr.Markdown("## [DeepMosaics](https://github.com/HypoX64/DeepMosaics)") | |
| target = gr.Radio(["Face", "Body/NSFW"], value="Face", label="Target", scale=0) | |
| with gr.Row(): | |
| # Input with preview | |
| with gr.Column(): | |
| input_file = gr.File( | |
| label="Input (Image or Video)", | |
| file_types=[".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif", ".mp4", ".avi", ".mov", ".mkv", ".webm"] | |
| ) | |
| preview_img = gr.Image(label="Preview", height=250, visible=True, interactive=False) | |
| preview_vid = gr.Video(label="Preview", height=250, visible=False, interactive=False) | |
| # Output | |
| with gr.Column(): | |
| output_img = gr.Image(label="Output", height=300, visible=True) | |
| output_vid = gr.Video(label="Output", height=300, visible=False) | |
| with gr.Row(): | |
| btn_add = gr.Button("Add Mosaic") | |
| btn_remove = gr.Button("Remove Mosaic", variant="primary") | |
| # Examples with cached outputs | |
| def example_remove(filepath, target): | |
| mode = "body" if "Body" in target or "General" in target else "face" | |
| img = to_bgr(PILImage.open(filepath)) | |
| result = process_image(img, "remove", mode) | |
| return PILImage.fromarray(result[:, :, ::-1]) | |
| gr.Examples( | |
| examples=[ | |
| ["examples/mosaic.jpg", "Face"], | |
| ["examples/face_clean.jpg", "Face"], | |
| ["examples/youknow_mosaic.png", "Body/NSFW"], | |
| ], | |
| inputs=[input_file, target], | |
| outputs=output_img, | |
| fn=example_remove, | |
| cache_examples=True, | |
| cache_mode="lazy", | |
| ) | |
| # Update preview when file uploaded | |
| input_file.change(fn=update_preview, inputs=[input_file], outputs=[preview_img, preview_vid]) | |
| btn_add.click( | |
| fn=lambda f, t: process_any(f, t, "add"), | |
| inputs=[input_file, target], | |
| outputs=[output_img, output_vid] | |
| ) | |
| btn_remove.click( | |
| fn=lambda f, t: process_any(f, t, "remove"), | |
| inputs=[input_file, target], | |
| outputs=[output_img, output_vid] | |
| ) | |
| demo.launch(css=css) | |
| else: | |
| print("Usage:") | |
| print(" python app.py # Gradio UI") | |
| print(" python app.py add input.jpg out.jpg # Add mosaic") | |
| print(" python app.py remove input.jpg out.jpg body # Remove body mosaic") | |