DeepMosaics / app.py
Nekochu's picture
Add author credits
94f48ff verified
"""
DeepMosaics - Add/remove mosaics from images/videos using AI.
https://github.com/HypoX64/DeepMosaics
"""
import os
import numpy as np
import cv2
import onnxruntime as ort
ONNX_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "onnx_models")
VIDEO_EXTS = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.gif']
sessions = {}
def get_session(name):
if name not in sessions:
path = os.path.join(ONNX_DIR, f"{name}.onnx")
if not os.path.exists(path):
raise FileNotFoundError(f"Model not found: {path}")
sessions[name] = ort.InferenceSession(path, providers=['CPUExecutionProvider'])
return sessions[name]
# ============ Segmentation ============
def run_segment(img, model, size=360):
sess = get_session(model)
resized = cv2.resize(img, (size, size)).astype(np.float32) / 255.0
tensor = np.transpose(resized, (2, 0, 1))[np.newaxis]
out = sess.run(None, {'input': tensor})[0].squeeze()
return (out * 255).clip(0, 255).astype(np.uint8)
def get_all_regions(img, model, threshold=127, ex_mul=1.5, all_areas=False):
"""Get detected mosaic regions with repo-style detection. Returns (regions, mask)"""
h, w = img.shape[:2]
mask_raw = run_segment(img, model)
# Repo-style mask processing
ex_mun = max(1, int(min(h, w) / 20))
mask = cv2.threshold(mask_raw, threshold, 255, cv2.THRESH_BINARY)[1]
mask = cv2.blur(mask, (ex_mun, ex_mun))
mask = cv2.threshold(mask, int(threshold / 5), 255, cv2.THRESH_BINARY)[1]
# Find most likely ROI (largest contour) - like repo's find_mostlikely_ROI
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not all_areas and contours:
# Keep only largest contour
areas = [cv2.contourArea(c) for c in contours]
if areas:
largest_idx = areas.index(max(areas))
mask = np.zeros_like(mask)
cv2.fillPoly(mask, [contours[largest_idx]], 255)
contours = [contours[largest_idx]]
regions = []
rat = min(h, w) / 360.0
for c in contours:
if cv2.contourArea(c) < 50:
continue
x, y, bw, bh = cv2.boundingRect(c)
cx, cy = x + bw // 2, y + bh // 2
size_orig = max(bw, bh)
# Scale to original and apply Ex_mul expansion
cx = int(cx * rat)
cy = int(cy * rat)
halfsize = int(size_orig * rat * ex_mul / 2)
# Clamp to image bounds
halfsize = max(15, min(halfsize, min(h, w) // 2 - 1))
cx = max(halfsize, min(cx, w - halfsize))
cy = max(halfsize, min(cy, h - halfsize))
regions.append((cx, cy, halfsize))
return regions, mask
def get_region(img, model):
# add_youknow has weaker detection, use lower threshold
threshold = 20 if model == "add_youknow" else 127
regions, _ = get_all_regions(img, model, threshold=threshold)
return max(regions, key=lambda r: r[2]) if regions else (0, 0, 0)
# ============ Cleaning ============
def run_clean(crop, model, size):
sess = get_session(model)
img = cv2.resize(crop, (size, size))
img = img[:, :, ::-1] # BGR to RGB (model expects RGB)
img = img.astype(np.float32) / 255.0 * 2 - 1
img = np.transpose(img, (2, 0, 1))[np.newaxis]
out = sess.run(None, {'input': img})[0].squeeze()
out = np.transpose(out, (1, 2, 0))
out = ((out + 1) / 2 * 255).clip(0, 255).astype(np.uint8)
return out[:, :, ::-1] # RGB to BGR
def run_clean_video(crops, prev_frame):
"""Run video model (5-frame input for temporal consistency)"""
sess = get_session("clean_youknow_video")
size = 256
frames = []
for crop in crops:
img = cv2.resize(crop, (size, size))[:, :, ::-1] # BGR to RGB
img = img.astype(np.float32) / 255.0 * 2 - 1
frames.append(np.transpose(img, (2, 0, 1)))
stream = np.stack(frames, axis=1)[np.newaxis] # [1, 3, 5, 256, 256]
if prev_frame is None:
prev = np.zeros((1, 3, size, size), dtype=np.float32)
else:
p = cv2.resize(prev_frame, (size, size))[:, :, ::-1]
p = p.astype(np.float32) / 255.0 * 2 - 1
prev = np.transpose(p, (2, 0, 1))[np.newaxis]
out = sess.run(None, {'input': stream, 'prev_frame': prev})[0].squeeze()
out = np.transpose(out, (1, 2, 0))
out = ((out + 1) / 2 * 255).clip(0, 255).astype(np.uint8)
return out[:, :, ::-1] # RGB to BGR
def blend(img, fake, x, y, size, seg_mask=None):
"""Blend fake into img using segmentation mask (repo-style)"""
h, w = img.shape[:2]
fake = cv2.resize(fake, (size * 2, size * 2), interpolation=cv2.INTER_CUBIC)
y1, y2, x1, x2 = y - size, y + size, x - size, x + size
if y1 < 0 or x1 < 0 or y2 > h or x2 > w:
return img
# Use segmentation mask if provided, else use box mask
if seg_mask is not None:
# Resize mask to original image size and crop
mask_full = cv2.resize(seg_mask, (w, h))
mask_crop = mask_full[y1:y2, x1:x2]
else:
mask_crop = np.ones((size*2, size*2), dtype=np.uint8) * 255
# Feathering (eclosion like repo)
eclosion_num = int(size / 10) + 2
mask_crop = cv2.blur(mask_crop, (eclosion_num, eclosion_num))
mask_crop = mask_crop.astype(np.float32) / 255.0
mask_crop = np.stack([mask_crop]*3, axis=-1)
crop = img[y1:y2, x1:x2].astype(np.float32)
img[y1:y2, x1:x2] = np.clip(crop * (1 - mask_crop) + fake.astype(np.float32) * mask_crop, 0, 255).astype(np.uint8)
return img
def addmosaic_base(img, mask, n, model='squa_avg', feather=0):
"""Repo-style mosaic adding (squa_avg with feather)"""
n = int(max(1, n))
h, w = img.shape[:2]
if mask.shape[0] != h:
mask = cv2.resize(mask, (w, h))
img_mosaic = img.copy()
h_step = h // n
w_step = w // n
pix_mid_h = n // 2
pix_mid_w = n // 2
# squa_avg: fill each block with average color
for i in range(h_step):
for j in range(w_step):
if mask[min(i*n + pix_mid_h, h-1), min(j*n + pix_mid_w, w-1)] > 0:
block = img[i*n:(i+1)*n, j*n:(j+1)*n, :]
if block.size > 0:
img_mosaic[i*n:(i+1)*n, j*n:(j+1)*n, :] = block.mean(axis=(0,1))
# Feathering for smooth edges
if feather >= 0:
blur_size = n if feather == 0 else feather
mask_blur = cv2.blur(mask.astype(np.float32), (blur_size, blur_size)) / 255.0
for i in range(3):
img_mosaic[:,:,i] = (img[:,:,i] * (1 - mask_blur) + img_mosaic[:,:,i] * mask_blur)
img_mosaic = img_mosaic.astype(np.uint8)
return img_mosaic
def get_mosaic_autosize(img, mask):
"""Calculate mosaic size based on mask area (repo-style)"""
h, w = img.shape[:2]
size = min(h, w)
mask_resized = cv2.resize(mask, (size, size))
alpha = size / 512
# Calculate mask area
area = np.sum(mask_resized > 127)
area = area / (alpha * alpha)
if area > 50000:
mosaic_size = alpha * ((area - 50000) / 50000 + 12)
elif 20000 < area <= 50000:
mosaic_size = alpha * ((area - 20000) / 30000 + 8)
elif 5000 < area <= 20000:
mosaic_size = alpha * ((area - 5000) / 20000 + 7)
elif 0 <= area <= 5000:
mosaic_size = alpha * (area / 5000 + 6)
else:
mosaic_size = 7
return max(3, mosaic_size)
def add_mosaic_mask(img, model, threshold=20):
"""Add mosaic using mask (repo-style for body/general mode)"""
h, w = img.shape[:2]
mask = run_segment(img, model)
mask = cv2.threshold(mask, threshold, 255, cv2.THRESH_BINARY)[1]
mask = cv2.resize(mask, (w, h))
mosaic_size = get_mosaic_autosize(img, mask)
return addmosaic_base(img, mask, mosaic_size, model='squa_avg', feather=0)
def pixelate(img, x, y, size, block=7):
y1, y2, x1, x2 = y - size, y + size, x - size, x + size
if y1 < 0 or x1 < 0 or y2 > img.shape[0] or x2 > img.shape[1]:
return img
region = img[y1:y2, x1:x2]
rh, rw = region.shape[:2]
if rh <= 0 or rw <= 0:
return img
small = cv2.resize(region, (max(1, rw//block), max(1, rh//block)), interpolation=cv2.INTER_LINEAR)
img[y1:y2, x1:x2] = cv2.resize(small, (rw, rh), interpolation=cv2.INTER_NEAREST)
return img
# ============ Processing ============
def process_image(img_bgr, action, mode="face"):
result = img_bgr.copy()
if action == "add":
if mode == "face":
x, y, size = get_region(img_bgr, "add_face")
if size >= 10:
result = pixelate(result, x, y, size)
else:
# Body mode: use mask-based mosaic (like repo)
result = add_mosaic_mask(img_bgr, "add_youknow")
else:
# Face mode uses larger expansion for better coverage
ex_mul = 2.0 if mode == "face" else 1.5
regions, seg_mask = get_all_regions(img_bgr, "mosaic_position", ex_mul=ex_mul)
for x, y, size in regions:
if size < 10:
continue
crop = result[y-size:y+size, x-size:x+size]
if crop.size == 0:
continue
if mode == "face":
fake = run_clean(crop, "clean_face_HD", 512)
else:
# Use video model for body/general (better quality than img model)
crops = [crop] * 5
fake = run_clean_video(crops, None)
result = blend(result, fake, x, y, size, seg_mask)
return result
def process_video(video_path, action, mode="face"):
import tempfile
if not video_path:
return None
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
fps = cap.get(cv2.CAP_PROP_FPS) or 30
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out_path = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False).name
out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
# For body/general video removal, use video model with 5-frame input
if action == "remove" and mode == "body":
frames, regions = [], []
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
regs, _ = get_all_regions(frame, "mosaic_position")
regions.append(regs)
prev_output = None
for i, frame in enumerate(frames):
result = frame.copy()
for x, y, size in regions[i]:
if size < 10:
continue
# Get 5 crops centered on frame i
crops = []
for j in range(i-2, i+3):
idx = max(0, min(j, len(frames)-1))
rx, ry, rs = (regions[idx][0] if regions[idx] else (x, y, size))
crop = frames[idx][ry-rs:ry+rs, rx-rs:rx+rs]
if crop.size == 0:
crop = np.zeros((size*2, size*2, 3), dtype=np.uint8)
crops.append(crop)
fake = run_clean_video(crops, prev_output)
prev_output = fake
result = blend(result, fake, x, y, size)
out.write(result)
else:
# Frame-by-frame for face or add
while True:
ret, frame = cap.read()
if not ret:
break
out.write(process_image(frame, action, mode))
cap.release()
out.release()
return out_path
# ============ Gradio ============
def is_video(file_path):
if not file_path:
return False
ext = os.path.splitext(str(file_path))[1].lower()
return ext in VIDEO_EXTS
def to_bgr(pil_img):
"""Convert PIL image to BGR, handling grayscale"""
img = np.array(pil_img)
if img.ndim == 2: # Grayscale
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
elif img.shape[2] == 4: # RGBA
img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
else: # RGB
img = img[:, :, ::-1]
return img
def add_mosaic_img(file, target):
if file is None:
return None
img = to_bgr(file)
return process_image(img, "add", "face")[:, :, ::-1]
def remove_mosaic_img(file, target):
if file is None:
return None
mode = "body" if "Body" in target or "General" in target else "face"
img = to_bgr(file)
return process_image(img, "remove", mode)[:, :, ::-1]
def add_mosaic_vid(file, target):
if file is None:
return None
return process_video(file, "add", "face")
def remove_mosaic_vid(file, target):
if file is None:
return None
mode = "body" if "Body" in target or "General" in target else "face"
return process_video(file, "remove", mode)
if __name__ == "__main__":
import sys
if len(sys.argv) >= 4:
from PIL import Image
import shutil
action, inp, out = sys.argv[1], sys.argv[2], sys.argv[3]
mode = sys.argv[4] if len(sys.argv) > 4 else "face"
ext = os.path.splitext(inp)[1].lower()
if ext in VIDEO_EXTS:
result_path = process_video(inp, action, mode)
if result_path:
shutil.move(result_path, out)
print(f"Saved: {out}")
else:
img = Image.open(inp)
img_bgr = np.array(img)[:, :, :3][:, :, ::-1]
result = process_image(img_bgr, action, mode)
Image.fromarray(result[:, :, ::-1]).save(out)
print(f"Saved: {out}")
elif len(sys.argv) == 1:
import gradio as gr
from PIL import Image as PILImage
def remove_mosaic_for_example(input_img, target):
"""Process for examples - returns output image"""
if input_img is None:
return None
mode = "body" if "Body" in target or "General" in target else "face"
img = to_bgr(input_img)
result = process_image(img, "remove", mode)
return PILImage.fromarray(result[:, :, ::-1])
def add_mosaic_for_example(input_img, target):
"""Process for examples - returns output image"""
if input_img is None:
return None
img = to_bgr(input_img)
result = process_image(img, "add", "face")
return PILImage.fromarray(result[:, :, ::-1])
css = ".compact { max-width: 900px; margin: auto; }"
def process_any(file, target, action):
"""Process image or video - auto-detect by extension"""
if file is None:
return gr.update(visible=True, value=None), gr.update(visible=False, value=None)
path = file if isinstance(file, str) else file
ext = os.path.splitext(path)[1].lower()
mode = "body" if "Body" in target or "General" in target else "face"
if ext in VIDEO_EXTS:
# Video/GIF - show video output, hide image
result = process_video(path, action, mode)
return gr.update(visible=False, value=None), gr.update(visible=True, value=result)
else:
# Image - show image output, hide video
img = to_bgr(PILImage.open(path))
result = process_image(img, action, mode)
return gr.update(visible=True, value=PILImage.fromarray(result[:, :, ::-1])), gr.update(visible=False, value=None)
def update_preview(file):
"""Update preview based on file type"""
if file is None:
return gr.update(visible=True, value=None), gr.update(visible=False, value=None)
path = file if isinstance(file, str) else file
ext = os.path.splitext(path)[1].lower()
if ext in VIDEO_EXTS:
return gr.update(visible=False, value=None), gr.update(visible=True, value=path)
else:
return gr.update(visible=True, value=path), gr.update(visible=False, value=None)
with gr.Blocks(title="DeepMosaics") as demo:
with gr.Column(elem_classes="compact"):
gr.Markdown("## [DeepMosaics](https://github.com/HypoX64/DeepMosaics)")
target = gr.Radio(["Face", "Body/NSFW"], value="Face", label="Target", scale=0)
with gr.Row():
# Input with preview
with gr.Column():
input_file = gr.File(
label="Input (Image or Video)",
file_types=[".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif", ".mp4", ".avi", ".mov", ".mkv", ".webm"]
)
preview_img = gr.Image(label="Preview", height=250, visible=True, interactive=False)
preview_vid = gr.Video(label="Preview", height=250, visible=False, interactive=False)
# Output
with gr.Column():
output_img = gr.Image(label="Output", height=300, visible=True)
output_vid = gr.Video(label="Output", height=300, visible=False)
with gr.Row():
btn_add = gr.Button("Add Mosaic")
btn_remove = gr.Button("Remove Mosaic", variant="primary")
# Examples with cached outputs
def example_remove(filepath, target):
mode = "body" if "Body" in target or "General" in target else "face"
img = to_bgr(PILImage.open(filepath))
result = process_image(img, "remove", mode)
return PILImage.fromarray(result[:, :, ::-1])
gr.Examples(
examples=[
["examples/mosaic.jpg", "Face"],
["examples/face_clean.jpg", "Face"],
["examples/youknow_mosaic.png", "Body/NSFW"],
],
inputs=[input_file, target],
outputs=output_img,
fn=example_remove,
cache_examples=True,
cache_mode="lazy",
)
# Update preview when file uploaded
input_file.change(fn=update_preview, inputs=[input_file], outputs=[preview_img, preview_vid])
btn_add.click(
fn=lambda f, t: process_any(f, t, "add"),
inputs=[input_file, target],
outputs=[output_img, output_vid]
)
btn_remove.click(
fn=lambda f, t: process_any(f, t, "remove"),
inputs=[input_file, target],
outputs=[output_img, output_vid]
)
demo.launch(css=css)
else:
print("Usage:")
print(" python app.py # Gradio UI")
print(" python app.py add input.jpg out.jpg # Add mosaic")
print(" python app.py remove input.jpg out.jpg body # Remove body mosaic")