Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
@@ -7,12 +7,15 @@ import tempfile
|
|
7 |
import os
|
8 |
|
9 |
# --- MediaPipe Initialization ---
|
|
|
10 |
try:
|
11 |
mp_face_mesh = mp.solutions.face_mesh
|
|
|
|
|
12 |
face_mesh = mp_face_mesh.FaceMesh(
|
13 |
static_image_mode=True,
|
14 |
max_num_faces=1,
|
15 |
-
refine_landmarks=True,
|
16 |
min_detection_confidence=0.5
|
17 |
)
|
18 |
print("MediaPipe Face Mesh initialized successfully.")
|
@@ -20,7 +23,7 @@ except (ImportError, AttributeError):
|
|
20 |
print("Error: Could not initialize MediaPipe Face Mesh. Is mediapipe installed correctly?")
|
21 |
face_mesh = None
|
22 |
|
23 |
-
# --- Helper Functions
|
24 |
|
25 |
def get_landmarks(img, landmark_step=1):
|
26 |
"""
|
@@ -28,33 +31,54 @@ def get_landmarks(img, landmark_step=1):
|
|
28 |
Includes sub-sampling for performance.
|
29 |
- landmark_step: Step to sample landmarks. 1 = all, 2 = half, etc.
|
30 |
"""
|
31 |
-
if img is None:
|
32 |
-
|
|
|
|
|
|
|
|
|
|
|
33 |
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
34 |
try:
|
35 |
results = face_mesh.process(img_rgb)
|
36 |
except Exception as e:
|
37 |
print(f"Error processing image with MediaPipe: {e}")
|
38 |
return None
|
39 |
-
|
|
|
|
|
|
|
40 |
|
41 |
landmarks_mp = results.multi_face_landmarks[0]
|
42 |
h, w, _ = img.shape
|
|
|
|
|
43 |
full_landmarks = np.array([(pt.x * w, pt.y * h) for pt in landmarks_mp.landmark], dtype=np.float32)
|
44 |
|
|
|
45 |
if landmark_step > 1:
|
|
|
46 |
landmarks = full_landmarks[::landmark_step]
|
47 |
else:
|
48 |
landmarks = full_landmarks
|
49 |
|
50 |
-
if not np.all(np.isfinite(landmarks)):
|
51 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
52 |
landmarks = np.vstack((landmarks, corners))
|
|
|
53 |
return landmarks
|
54 |
|
55 |
def calculate_delaunay_triangles(rect, points):
|
56 |
-
"""Calculates Delaunay triangulation for a set of points."""
|
57 |
-
if points is None or len(points) < 3:
|
|
|
58 |
if not np.all(np.isfinite(points)):
|
59 |
points = points[np.all(np.isfinite(points), axis=1)]
|
60 |
if len(points) < 3: return []
|
@@ -65,13 +89,15 @@ def calculate_delaunay_triangles(rect, points):
|
|
65 |
subdiv = cv2.Subdiv2D(rect)
|
66 |
point_map = { (int(p[0]), int(p[1])): i for i, p in enumerate(points) }
|
67 |
inserted_points_map = {}
|
|
|
68 |
for i, p in enumerate(points):
|
69 |
point_tuple = (int(p[0]), int(p[1]))
|
70 |
if point_tuple not in inserted_points_map:
|
71 |
try:
|
72 |
subdiv.insert(point_tuple)
|
73 |
inserted_points_map[point_tuple] = i
|
74 |
-
except cv2.error:
|
|
|
75 |
|
76 |
triangle_list = subdiv.getTriangleList()
|
77 |
delaunay_triangles = []
|
@@ -83,29 +109,41 @@ def calculate_delaunay_triangles(rect, points):
|
|
83 |
delaunay_triangles.append(indices)
|
84 |
return delaunay_triangles
|
85 |
|
|
|
86 |
def warp_triangle(img1, img2, t1, t2):
|
87 |
-
"""Warps a triangle from img1 to img2."""
|
88 |
-
if len(t1) != 3 or len(t2) != 3 or not np.all(np.isfinite(t1)) or not np.all(np.isfinite(t2)):
|
|
|
89 |
try:
|
90 |
r1 = cv2.boundingRect(np.float32([t1]))
|
91 |
r2 = cv2.boundingRect(np.float32([t2]))
|
|
|
92 |
if r1[2] <= 0 or r1[3] <= 0 or r2[2] <= 0 or r2[3] <= 0: return
|
|
|
93 |
t1_rect = [(t1[i][0] - r1[0], t1[i][1] - r1[1]) for i in range(3)]
|
94 |
t2_rect = [(t2[i][0] - r2[0], t2[i][1] - r2[1]) for i in range(3)]
|
|
|
95 |
mask = np.zeros((r2[3], r2[2], 3), dtype=np.float32)
|
96 |
cv2.fillConvexPoly(mask, np.int32(t2_rect), (1.0, 1.0, 1.0), 16, 0)
|
|
|
97 |
img1_rect = img1[r1[1]:r1[1] + r1[3], r1[0]:r1[0] + r1[2]]
|
98 |
if img1_rect.size == 0: return
|
|
|
99 |
size = (r2[2], r2[3])
|
100 |
warp_mat = cv2.getAffineTransform(np.float32(t1_rect), np.float32(t2_rect))
|
101 |
img2_rect = cv2.warpAffine(img1_rect, warp_mat, size, None, flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
|
|
|
102 |
img2_rect *= mask
|
|
|
103 |
y_start, y_end = r2[1], r2[1] + r2[3]
|
104 |
x_start, x_end = r2[0], r2[0] + r2[2]
|
|
|
105 |
h_img2, w_img2, _ = img2.shape
|
106 |
if y_start >= h_img2 or x_start >= w_img2: return
|
|
|
107 |
img2[y_start:y_end, x_start:x_end] = img2[y_start:y_end, x_start:x_end] * (1.0 - mask) + img2_rect
|
108 |
-
except (cv2.error, IndexError):
|
|
|
109 |
|
110 |
# --- Main Morphing Function (Modified) ---
|
111 |
def morph_faces(img1_orig, img2_orig, alpha, resize_dim, landmark_step):
|
@@ -180,31 +218,45 @@ def morph_faces(img1_orig, img2_orig, alpha, resize_dim, landmark_step):
|
|
180 |
print(f"Frame morph ({w}x{h}, {len(landmarks1)} landmarks) took: {end_time - start_time:.4f}s")
|
181 |
return morphed_img
|
182 |
|
183 |
-
|
184 |
# --- Video Processing Function (Modified) ---
|
185 |
-
def process_video(video_path, target_img, transition_level, resolution, landmark_sampling
|
186 |
"""
|
187 |
-
Callback function that now receives
|
188 |
"""
|
189 |
if video_path is None or target_img is None:
|
|
|
190 |
dummy_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
|
191 |
-
|
|
|
192 |
out.release()
|
193 |
return dummy_path
|
194 |
|
195 |
alpha = (transition_level + 1.0) / 2.0
|
|
|
|
|
196 |
cap = cv2.VideoCapture(video_path)
|
|
|
|
|
|
|
197 |
fps = cap.get(cv2.CAP_PROP_FPS) or 24
|
|
|
198 |
tmp_out = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
|
199 |
-
|
|
|
|
|
200 |
|
|
|
201 |
while True:
|
202 |
ret, frame = cap.read()
|
203 |
-
if not ret:
|
204 |
-
|
205 |
-
|
|
|
|
|
206 |
out.write(morphed)
|
|
|
207 |
|
|
|
208 |
cap.release()
|
209 |
out.release()
|
210 |
return tmp_out.name
|
@@ -219,18 +271,28 @@ with gr.Blocks(css=css) as iface:
|
|
219 |
img_input = gr.Image(type="numpy", label="Target Face Image")
|
220 |
|
221 |
with gr.Row():
|
222 |
-
|
223 |
-
|
224 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
225 |
slider = gr.Slider(-1.0, 1.0, value=0.0, step=0.05, label="Transition Level (-1 = Video, 1 = Image)")
|
226 |
-
|
227 |
-
# --- NEW: Checkbox for triangulation lines ---
|
228 |
-
triangles_checkbox = gr.Checkbox(label="Show Triangulation Lines", value=False)
|
229 |
-
|
230 |
video_output = gr.Video(label="Morphed Video")
|
231 |
|
232 |
-
|
|
|
233 |
|
|
|
234 |
for component in inputs:
|
235 |
component.change(
|
236 |
fn=process_video,
|
|
|
7 |
import os
|
8 |
|
9 |
# --- MediaPipe Initialization ---
|
10 |
+
# Use try-except block for robustness if mediapipe is not installed correctly
|
11 |
try:
|
12 |
mp_face_mesh = mp.solutions.face_mesh
|
13 |
+
# NOTE: refine_landmarks=True gives 478 landmarks. False gives 468.
|
14 |
+
# We will control density by sub-sampling rather than this boolean for more control.
|
15 |
face_mesh = mp_face_mesh.FaceMesh(
|
16 |
static_image_mode=True,
|
17 |
max_num_faces=1,
|
18 |
+
refine_landmarks=True, # Keep this on for the best potential quality
|
19 |
min_detection_confidence=0.5
|
20 |
)
|
21 |
print("MediaPipe Face Mesh initialized successfully.")
|
|
|
23 |
print("Error: Could not initialize MediaPipe Face Mesh. Is mediapipe installed correctly?")
|
24 |
face_mesh = None
|
25 |
|
26 |
+
# --- Helper Functions ---
|
27 |
|
28 |
def get_landmarks(img, landmark_step=1):
|
29 |
"""
|
|
|
31 |
Includes sub-sampling for performance.
|
32 |
- landmark_step: Step to sample landmarks. 1 = all, 2 = half, etc.
|
33 |
"""
|
34 |
+
if img is None:
|
35 |
+
print("Warning: Input image is None in get_landmarks.")
|
36 |
+
return None
|
37 |
+
if face_mesh is None:
|
38 |
+
print("Error: MediaPipe Face Mesh not available.")
|
39 |
+
return None
|
40 |
+
|
41 |
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
42 |
try:
|
43 |
results = face_mesh.process(img_rgb)
|
44 |
except Exception as e:
|
45 |
print(f"Error processing image with MediaPipe: {e}")
|
46 |
return None
|
47 |
+
|
48 |
+
if not results.multi_face_landmarks:
|
49 |
+
print("Warning: No face detected.")
|
50 |
+
return None
|
51 |
|
52 |
landmarks_mp = results.multi_face_landmarks[0]
|
53 |
h, w, _ = img.shape
|
54 |
+
|
55 |
+
# Get all landmarks first
|
56 |
full_landmarks = np.array([(pt.x * w, pt.y * h) for pt in landmarks_mp.landmark], dtype=np.float32)
|
57 |
|
58 |
+
# --- NEW: Sub-sample landmarks for speed ---
|
59 |
if landmark_step > 1:
|
60 |
+
# Sample with a step, ensuring correspondence is maintained between faces
|
61 |
landmarks = full_landmarks[::landmark_step]
|
62 |
else:
|
63 |
landmarks = full_landmarks
|
64 |
|
65 |
+
if not np.all(np.isfinite(landmarks)):
|
66 |
+
print("Warning: Invalid landmark coordinates detected (NaN/inf).")
|
67 |
+
return None
|
68 |
+
|
69 |
+
corners = np.array([
|
70 |
+
[0, 0], [w - 1, 0], [0, h - 1], [w - 1, h - 1]
|
71 |
+
], dtype=np.float32)
|
72 |
+
|
73 |
+
# Always include corners for stable warping
|
74 |
landmarks = np.vstack((landmarks, corners))
|
75 |
+
|
76 |
return landmarks
|
77 |
|
78 |
def calculate_delaunay_triangles(rect, points):
|
79 |
+
"""Calculates Delaunay triangulation for a set of points. (No changes needed here)"""
|
80 |
+
if points is None or len(points) < 3:
|
81 |
+
return []
|
82 |
if not np.all(np.isfinite(points)):
|
83 |
points = points[np.all(np.isfinite(points), axis=1)]
|
84 |
if len(points) < 3: return []
|
|
|
89 |
subdiv = cv2.Subdiv2D(rect)
|
90 |
point_map = { (int(p[0]), int(p[1])): i for i, p in enumerate(points) }
|
91 |
inserted_points_map = {}
|
92 |
+
|
93 |
for i, p in enumerate(points):
|
94 |
point_tuple = (int(p[0]), int(p[1]))
|
95 |
if point_tuple not in inserted_points_map:
|
96 |
try:
|
97 |
subdiv.insert(point_tuple)
|
98 |
inserted_points_map[point_tuple] = i
|
99 |
+
except cv2.error:
|
100 |
+
continue
|
101 |
|
102 |
triangle_list = subdiv.getTriangleList()
|
103 |
delaunay_triangles = []
|
|
|
109 |
delaunay_triangles.append(indices)
|
110 |
return delaunay_triangles
|
111 |
|
112 |
+
|
113 |
def warp_triangle(img1, img2, t1, t2):
|
114 |
+
"""Warps a triangle from img1 to img2. (No changes needed here)"""
|
115 |
+
if len(t1) != 3 or len(t2) != 3 or not np.all(np.isfinite(t1)) or not np.all(np.isfinite(t2)):
|
116 |
+
return
|
117 |
try:
|
118 |
r1 = cv2.boundingRect(np.float32([t1]))
|
119 |
r2 = cv2.boundingRect(np.float32([t2]))
|
120 |
+
|
121 |
if r1[2] <= 0 or r1[3] <= 0 or r2[2] <= 0 or r2[3] <= 0: return
|
122 |
+
|
123 |
t1_rect = [(t1[i][0] - r1[0], t1[i][1] - r1[1]) for i in range(3)]
|
124 |
t2_rect = [(t2[i][0] - r2[0], t2[i][1] - r2[1]) for i in range(3)]
|
125 |
+
|
126 |
mask = np.zeros((r2[3], r2[2], 3), dtype=np.float32)
|
127 |
cv2.fillConvexPoly(mask, np.int32(t2_rect), (1.0, 1.0, 1.0), 16, 0)
|
128 |
+
|
129 |
img1_rect = img1[r1[1]:r1[1] + r1[3], r1[0]:r1[0] + r1[2]]
|
130 |
if img1_rect.size == 0: return
|
131 |
+
|
132 |
size = (r2[2], r2[3])
|
133 |
warp_mat = cv2.getAffineTransform(np.float32(t1_rect), np.float32(t2_rect))
|
134 |
img2_rect = cv2.warpAffine(img1_rect, warp_mat, size, None, flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
|
135 |
+
|
136 |
img2_rect *= mask
|
137 |
+
|
138 |
y_start, y_end = r2[1], r2[1] + r2[3]
|
139 |
x_start, x_end = r2[0], r2[0] + r2[2]
|
140 |
+
|
141 |
h_img2, w_img2, _ = img2.shape
|
142 |
if y_start >= h_img2 or x_start >= w_img2: return
|
143 |
+
|
144 |
img2[y_start:y_end, x_start:x_end] = img2[y_start:y_end, x_start:x_end] * (1.0 - mask) + img2_rect
|
145 |
+
except (cv2.error, IndexError):
|
146 |
+
pass # Ignore degenerate triangles or slicing errors
|
147 |
|
148 |
# --- Main Morphing Function (Modified) ---
|
149 |
def morph_faces(img1_orig, img2_orig, alpha, resize_dim, landmark_step):
|
|
|
218 |
print(f"Frame morph ({w}x{h}, {len(landmarks1)} landmarks) took: {end_time - start_time:.4f}s")
|
219 |
return morphed_img
|
220 |
|
|
|
221 |
# --- Video Processing Function (Modified) ---
|
222 |
+
def process_video(video_path, target_img, transition_level, resolution, landmark_sampling):
|
223 |
"""
|
224 |
+
Callback function that now receives resolution and landmark settings from the UI.
|
225 |
"""
|
226 |
if video_path is None or target_img is None:
|
227 |
+
# Create a dummy video to avoid Gradio errors on empty inputs
|
228 |
dummy_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
|
229 |
+
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
230 |
+
out = cv2.VideoWriter(dummy_path, fourcc, 24, (resolution, resolution))
|
231 |
out.release()
|
232 |
return dummy_path
|
233 |
|
234 |
alpha = (transition_level + 1.0) / 2.0
|
235 |
+
alpha = float(np.clip(alpha, 0.0, 1.0))
|
236 |
+
|
237 |
cap = cv2.VideoCapture(video_path)
|
238 |
+
if not cap.isOpened():
|
239 |
+
raise IOError(f"Cannot open video file: {video_path}")
|
240 |
+
|
241 |
fps = cap.get(cv2.CAP_PROP_FPS) or 24
|
242 |
+
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
243 |
tmp_out = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
|
244 |
+
|
245 |
+
# --- Use dynamic resolution for the output video ---
|
246 |
+
out = cv2.VideoWriter(tmp_out.name, fourcc, fps, (resolution, resolution))
|
247 |
|
248 |
+
frame_count = 0
|
249 |
while True:
|
250 |
ret, frame = cap.read()
|
251 |
+
if not ret:
|
252 |
+
break
|
253 |
+
|
254 |
+
# Pass the new parameters to the morphing function
|
255 |
+
morphed = morph_faces(frame, target_img, alpha, resolution, landmark_sampling)
|
256 |
out.write(morphed)
|
257 |
+
frame_count += 1
|
258 |
|
259 |
+
print(f"Processed {frame_count} frames.")
|
260 |
cap.release()
|
261 |
out.release()
|
262 |
return tmp_out.name
|
|
|
271 |
img_input = gr.Image(type="numpy", label="Target Face Image")
|
272 |
|
273 |
with gr.Row():
|
274 |
+
# --- NEW: UI controls for performance ---
|
275 |
+
resolution_slider = gr.Dropdown(
|
276 |
+
[256, 384, 512, 768],
|
277 |
+
value=512,
|
278 |
+
label="Processing Resolution",
|
279 |
+
info="Lower resolution means much faster processing."
|
280 |
+
)
|
281 |
+
landmark_slider = gr.Slider(
|
282 |
+
1, 4,
|
283 |
+
value=1,
|
284 |
+
step=1,
|
285 |
+
label="Landmark Sub-sampling",
|
286 |
+
info="1=Max Quality (~478 landmarks), 4=Max Speed (~120 landmarks)"
|
287 |
+
)
|
288 |
+
|
289 |
slider = gr.Slider(-1.0, 1.0, value=0.0, step=0.05, label="Transition Level (-1 = Video, 1 = Image)")
|
|
|
|
|
|
|
|
|
290 |
video_output = gr.Video(label="Morphed Video")
|
291 |
|
292 |
+
# Gather all input components
|
293 |
+
inputs = [video_input, img_input, slider, resolution_slider, landmark_slider]
|
294 |
|
295 |
+
# Trigger processing on any input change
|
296 |
for component in inputs:
|
297 |
component.change(
|
298 |
fn=process_video,
|