""" DeepMosaics - Add/remove mosaics from images/videos using AI. https://github.com/HypoX64/DeepMosaics """ import os import numpy as np import cv2 import onnxruntime as ort ONNX_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "onnx_models") VIDEO_EXTS = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.gif'] sessions = {} def get_session(name): if name not in sessions: path = os.path.join(ONNX_DIR, f"{name}.onnx") if not os.path.exists(path): raise FileNotFoundError(f"Model not found: {path}") sessions[name] = ort.InferenceSession(path, providers=['CPUExecutionProvider']) return sessions[name] # ============ Segmentation ============ def run_segment(img, model, size=360): sess = get_session(model) resized = cv2.resize(img, (size, size)).astype(np.float32) / 255.0 tensor = np.transpose(resized, (2, 0, 1))[np.newaxis] out = sess.run(None, {'input': tensor})[0].squeeze() return (out * 255).clip(0, 255).astype(np.uint8) def get_all_regions(img, model, threshold=127, ex_mul=1.5, all_areas=False): """Get detected mosaic regions with repo-style detection. Returns (regions, mask)""" h, w = img.shape[:2] mask_raw = run_segment(img, model) # Repo-style mask processing ex_mun = max(1, int(min(h, w) / 20)) mask = cv2.threshold(mask_raw, threshold, 255, cv2.THRESH_BINARY)[1] mask = cv2.blur(mask, (ex_mun, ex_mun)) mask = cv2.threshold(mask, int(threshold / 5), 255, cv2.THRESH_BINARY)[1] # Find most likely ROI (largest contour) - like repo's find_mostlikely_ROI contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not all_areas and contours: # Keep only largest contour areas = [cv2.contourArea(c) for c in contours] if areas: largest_idx = areas.index(max(areas)) mask = np.zeros_like(mask) cv2.fillPoly(mask, [contours[largest_idx]], 255) contours = [contours[largest_idx]] regions = [] rat = min(h, w) / 360.0 for c in contours: if cv2.contourArea(c) < 50: continue x, y, bw, bh = cv2.boundingRect(c) cx, cy = x + bw // 2, y + bh // 2 size_orig = max(bw, bh) # Scale to original and apply Ex_mul expansion cx = int(cx * rat) cy = int(cy * rat) halfsize = int(size_orig * rat * ex_mul / 2) # Clamp to image bounds halfsize = max(15, min(halfsize, min(h, w) // 2 - 1)) cx = max(halfsize, min(cx, w - halfsize)) cy = max(halfsize, min(cy, h - halfsize)) regions.append((cx, cy, halfsize)) return regions, mask def get_region(img, model): # add_youknow has weaker detection, use lower threshold threshold = 20 if model == "add_youknow" else 127 regions, _ = get_all_regions(img, model, threshold=threshold) return max(regions, key=lambda r: r[2]) if regions else (0, 0, 0) # ============ Cleaning ============ def run_clean(crop, model, size): sess = get_session(model) img = cv2.resize(crop, (size, size)) img = img[:, :, ::-1] # BGR to RGB (model expects RGB) img = img.astype(np.float32) / 255.0 * 2 - 1 img = np.transpose(img, (2, 0, 1))[np.newaxis] out = sess.run(None, {'input': img})[0].squeeze() out = np.transpose(out, (1, 2, 0)) out = ((out + 1) / 2 * 255).clip(0, 255).astype(np.uint8) return out[:, :, ::-1] # RGB to BGR def run_clean_video(crops, prev_frame): """Run video model (5-frame input for temporal consistency)""" sess = get_session("clean_youknow_video") size = 256 frames = [] for crop in crops: img = cv2.resize(crop, (size, size))[:, :, ::-1] # BGR to RGB img = img.astype(np.float32) / 255.0 * 2 - 1 frames.append(np.transpose(img, (2, 0, 1))) stream = np.stack(frames, axis=1)[np.newaxis] # [1, 3, 5, 256, 256] if prev_frame is None: prev = np.zeros((1, 3, size, size), dtype=np.float32) else: p = cv2.resize(prev_frame, (size, size))[:, :, ::-1] p = p.astype(np.float32) / 255.0 * 2 - 1 prev = np.transpose(p, (2, 0, 1))[np.newaxis] out = sess.run(None, {'input': stream, 'prev_frame': prev})[0].squeeze() out = np.transpose(out, (1, 2, 0)) out = ((out + 1) / 2 * 255).clip(0, 255).astype(np.uint8) return out[:, :, ::-1] # RGB to BGR def blend(img, fake, x, y, size, seg_mask=None): """Blend fake into img using segmentation mask (repo-style)""" h, w = img.shape[:2] fake = cv2.resize(fake, (size * 2, size * 2), interpolation=cv2.INTER_CUBIC) y1, y2, x1, x2 = y - size, y + size, x - size, x + size if y1 < 0 or x1 < 0 or y2 > h or x2 > w: return img # Use segmentation mask if provided, else use box mask if seg_mask is not None: # Resize mask to original image size and crop mask_full = cv2.resize(seg_mask, (w, h)) mask_crop = mask_full[y1:y2, x1:x2] else: mask_crop = np.ones((size*2, size*2), dtype=np.uint8) * 255 # Feathering (eclosion like repo) eclosion_num = int(size / 10) + 2 mask_crop = cv2.blur(mask_crop, (eclosion_num, eclosion_num)) mask_crop = mask_crop.astype(np.float32) / 255.0 mask_crop = np.stack([mask_crop]*3, axis=-1) crop = img[y1:y2, x1:x2].astype(np.float32) img[y1:y2, x1:x2] = np.clip(crop * (1 - mask_crop) + fake.astype(np.float32) * mask_crop, 0, 255).astype(np.uint8) return img def addmosaic_base(img, mask, n, model='squa_avg', feather=0): """Repo-style mosaic adding (squa_avg with feather)""" n = int(max(1, n)) h, w = img.shape[:2] if mask.shape[0] != h: mask = cv2.resize(mask, (w, h)) img_mosaic = img.copy() h_step = h // n w_step = w // n pix_mid_h = n // 2 pix_mid_w = n // 2 # squa_avg: fill each block with average color for i in range(h_step): for j in range(w_step): if mask[min(i*n + pix_mid_h, h-1), min(j*n + pix_mid_w, w-1)] > 0: block = img[i*n:(i+1)*n, j*n:(j+1)*n, :] if block.size > 0: img_mosaic[i*n:(i+1)*n, j*n:(j+1)*n, :] = block.mean(axis=(0,1)) # Feathering for smooth edges if feather >= 0: blur_size = n if feather == 0 else feather mask_blur = cv2.blur(mask.astype(np.float32), (blur_size, blur_size)) / 255.0 for i in range(3): img_mosaic[:,:,i] = (img[:,:,i] * (1 - mask_blur) + img_mosaic[:,:,i] * mask_blur) img_mosaic = img_mosaic.astype(np.uint8) return img_mosaic def get_mosaic_autosize(img, mask): """Calculate mosaic size based on mask area (repo-style)""" h, w = img.shape[:2] size = min(h, w) mask_resized = cv2.resize(mask, (size, size)) alpha = size / 512 # Calculate mask area area = np.sum(mask_resized > 127) area = area / (alpha * alpha) if area > 50000: mosaic_size = alpha * ((area - 50000) / 50000 + 12) elif 20000 < area <= 50000: mosaic_size = alpha * ((area - 20000) / 30000 + 8) elif 5000 < area <= 20000: mosaic_size = alpha * ((area - 5000) / 20000 + 7) elif 0 <= area <= 5000: mosaic_size = alpha * (area / 5000 + 6) else: mosaic_size = 7 return max(3, mosaic_size) def add_mosaic_mask(img, model, threshold=20): """Add mosaic using mask (repo-style for body/general mode)""" h, w = img.shape[:2] mask = run_segment(img, model) mask = cv2.threshold(mask, threshold, 255, cv2.THRESH_BINARY)[1] mask = cv2.resize(mask, (w, h)) mosaic_size = get_mosaic_autosize(img, mask) return addmosaic_base(img, mask, mosaic_size, model='squa_avg', feather=0) def pixelate(img, x, y, size, block=7): y1, y2, x1, x2 = y - size, y + size, x - size, x + size if y1 < 0 or x1 < 0 or y2 > img.shape[0] or x2 > img.shape[1]: return img region = img[y1:y2, x1:x2] rh, rw = region.shape[:2] if rh <= 0 or rw <= 0: return img small = cv2.resize(region, (max(1, rw//block), max(1, rh//block)), interpolation=cv2.INTER_LINEAR) img[y1:y2, x1:x2] = cv2.resize(small, (rw, rh), interpolation=cv2.INTER_NEAREST) return img # ============ Processing ============ def process_image(img_bgr, action, mode="face"): result = img_bgr.copy() if action == "add": if mode == "face": x, y, size = get_region(img_bgr, "add_face") if size >= 10: result = pixelate(result, x, y, size) else: # Body mode: use mask-based mosaic (like repo) result = add_mosaic_mask(img_bgr, "add_youknow") else: # Face mode uses larger expansion for better coverage ex_mul = 2.0 if mode == "face" else 1.5 regions, seg_mask = get_all_regions(img_bgr, "mosaic_position", ex_mul=ex_mul) for x, y, size in regions: if size < 10: continue crop = result[y-size:y+size, x-size:x+size] if crop.size == 0: continue if mode == "face": fake = run_clean(crop, "clean_face_HD", 512) else: # Use video model for body/general (better quality than img model) crops = [crop] * 5 fake = run_clean_video(crops, None) result = blend(result, fake, x, y, size, seg_mask) return result def process_video(video_path, action, mode="face"): import tempfile if not video_path: return None cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None fps = cap.get(cv2.CAP_PROP_FPS) or 30 w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) out_path = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False).name out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) # For body/general video removal, use video model with 5-frame input if action == "remove" and mode == "body": frames, regions = [], [] while True: ret, frame = cap.read() if not ret: break frames.append(frame) regs, _ = get_all_regions(frame, "mosaic_position") regions.append(regs) prev_output = None for i, frame in enumerate(frames): result = frame.copy() for x, y, size in regions[i]: if size < 10: continue # Get 5 crops centered on frame i crops = [] for j in range(i-2, i+3): idx = max(0, min(j, len(frames)-1)) rx, ry, rs = (regions[idx][0] if regions[idx] else (x, y, size)) crop = frames[idx][ry-rs:ry+rs, rx-rs:rx+rs] if crop.size == 0: crop = np.zeros((size*2, size*2, 3), dtype=np.uint8) crops.append(crop) fake = run_clean_video(crops, prev_output) prev_output = fake result = blend(result, fake, x, y, size) out.write(result) else: # Frame-by-frame for face or add while True: ret, frame = cap.read() if not ret: break out.write(process_image(frame, action, mode)) cap.release() out.release() return out_path # ============ Gradio ============ def is_video(file_path): if not file_path: return False ext = os.path.splitext(str(file_path))[1].lower() return ext in VIDEO_EXTS def to_bgr(pil_img): """Convert PIL image to BGR, handling grayscale""" img = np.array(pil_img) if img.ndim == 2: # Grayscale img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) elif img.shape[2] == 4: # RGBA img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) else: # RGB img = img[:, :, ::-1] return img def add_mosaic_img(file, target): if file is None: return None img = to_bgr(file) return process_image(img, "add", "face")[:, :, ::-1] def remove_mosaic_img(file, target): if file is None: return None mode = "body" if "Body" in target or "General" in target else "face" img = to_bgr(file) return process_image(img, "remove", mode)[:, :, ::-1] def add_mosaic_vid(file, target): if file is None: return None return process_video(file, "add", "face") def remove_mosaic_vid(file, target): if file is None: return None mode = "body" if "Body" in target or "General" in target else "face" return process_video(file, "remove", mode) if __name__ == "__main__": import sys if len(sys.argv) >= 4: from PIL import Image import shutil action, inp, out = sys.argv[1], sys.argv[2], sys.argv[3] mode = sys.argv[4] if len(sys.argv) > 4 else "face" ext = os.path.splitext(inp)[1].lower() if ext in VIDEO_EXTS: result_path = process_video(inp, action, mode) if result_path: shutil.move(result_path, out) print(f"Saved: {out}") else: img = Image.open(inp) img_bgr = np.array(img)[:, :, :3][:, :, ::-1] result = process_image(img_bgr, action, mode) Image.fromarray(result[:, :, ::-1]).save(out) print(f"Saved: {out}") elif len(sys.argv) == 1: import gradio as gr from PIL import Image as PILImage def remove_mosaic_for_example(input_img, target): """Process for examples - returns output image""" if input_img is None: return None mode = "body" if "Body" in target or "General" in target else "face" img = to_bgr(input_img) result = process_image(img, "remove", mode) return PILImage.fromarray(result[:, :, ::-1]) def add_mosaic_for_example(input_img, target): """Process for examples - returns output image""" if input_img is None: return None img = to_bgr(input_img) result = process_image(img, "add", "face") return PILImage.fromarray(result[:, :, ::-1]) css = ".compact { max-width: 900px; margin: auto; }" def process_any(file, target, action): """Process image or video - auto-detect by extension""" if file is None: return gr.update(visible=True, value=None), gr.update(visible=False, value=None) path = file if isinstance(file, str) else file ext = os.path.splitext(path)[1].lower() mode = "body" if "Body" in target or "General" in target else "face" if ext in VIDEO_EXTS: # Video/GIF - show video output, hide image result = process_video(path, action, mode) return gr.update(visible=False, value=None), gr.update(visible=True, value=result) else: # Image - show image output, hide video img = to_bgr(PILImage.open(path)) result = process_image(img, action, mode) return gr.update(visible=True, value=PILImage.fromarray(result[:, :, ::-1])), gr.update(visible=False, value=None) def update_preview(file): """Update preview based on file type""" if file is None: return gr.update(visible=True, value=None), gr.update(visible=False, value=None) path = file if isinstance(file, str) else file ext = os.path.splitext(path)[1].lower() if ext in VIDEO_EXTS: return gr.update(visible=False, value=None), gr.update(visible=True, value=path) else: return gr.update(visible=True, value=path), gr.update(visible=False, value=None) with gr.Blocks(title="DeepMosaics") as demo: with gr.Column(elem_classes="compact"): gr.Markdown("## [DeepMosaics](https://github.com/HypoX64/DeepMosaics)") target = gr.Radio(["Face", "Body/NSFW"], value="Face", label="Target", scale=0) with gr.Row(): # Input with preview with gr.Column(): input_file = gr.File( label="Input (Image or Video)", file_types=[".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif", ".mp4", ".avi", ".mov", ".mkv", ".webm"] ) preview_img = gr.Image(label="Preview", height=250, visible=True, interactive=False) preview_vid = gr.Video(label="Preview", height=250, visible=False, interactive=False) # Output with gr.Column(): output_img = gr.Image(label="Output", height=300, visible=True) output_vid = gr.Video(label="Output", height=300, visible=False) with gr.Row(): btn_add = gr.Button("Add Mosaic") btn_remove = gr.Button("Remove Mosaic", variant="primary") # Examples with cached outputs def example_remove(filepath, target): mode = "body" if "Body" in target or "General" in target else "face" img = to_bgr(PILImage.open(filepath)) result = process_image(img, "remove", mode) return PILImage.fromarray(result[:, :, ::-1]) gr.Examples( examples=[ ["examples/mosaic.jpg", "Face"], ["examples/face_clean.jpg", "Face"], ["examples/youknow_mosaic.png", "Body/NSFW"], ], inputs=[input_file, target], outputs=output_img, fn=example_remove, cache_examples=True, cache_mode="lazy", ) # Update preview when file uploaded input_file.change(fn=update_preview, inputs=[input_file], outputs=[preview_img, preview_vid]) btn_add.click( fn=lambda f, t: process_any(f, t, "add"), inputs=[input_file, target], outputs=[output_img, output_vid] ) btn_remove.click( fn=lambda f, t: process_any(f, t, "remove"), inputs=[input_file, target], outputs=[output_img, output_vid] ) demo.launch(css=css) else: print("Usage:") print(" python app.py # Gradio UI") print(" python app.py add input.jpg out.jpg # Add mosaic") print(" python app.py remove input.jpg out.jpg body # Remove body mosaic")