jomasego commited on
Commit
b6e00fe
·
1 Parent(s): 3c33143

Fix: Add -movflags +faststart to yt-dlp to resolve moov atom error

Browse files
Files changed (1) hide show
  1. modal_whisper_app.py +276 -121
modal_whisper_app.py CHANGED
@@ -1,7 +1,9 @@
1
  import modal
2
  from fastapi import FastAPI, UploadFile, File, Body, Query
3
- from starlette.applications import Starlette
4
- from starlette.routing import Mount
 
 
5
  import os
6
  import tempfile
7
  import io # Used by Whisper for BytesIO
@@ -12,7 +14,8 @@ import hashlib
12
  from fastapi.responses import JSONResponse
13
  from fastapi.middleware.cors import CORSMiddleware
14
  from pydantic import BaseModel
15
- import re # For parsing search results
 
16
  import asyncio # For concurrent video processing
17
 
18
  import gradio as gr
@@ -28,7 +31,7 @@ OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50"
28
  OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50"
29
 
30
  # --- Modal Image Definition ---
31
- video_analysis_image = (
32
  modal.Image.debian_slim(python_version="3.10")
33
  .apt_install("ffmpeg")
34
  .pip_install(
@@ -41,8 +44,10 @@ video_analysis_image = (
41
  "torchvision",
42
  "torchaudio",
43
  "fastapi[standard]", # For web endpoints
44
- "pydantic", # For request body validation
45
- "httpx" # For downloading video from URL
 
 
46
  )
47
  )
48
 
@@ -51,12 +56,16 @@ app = modal.App(name="video-analysis-gradio-pipeline") # New app name, using App
51
 
52
  # --- Pydantic model for web endpoint request ---
53
  class VideoAnalysisRequestPayload(BaseModel):
54
- video_url: str
 
 
 
 
55
 
56
  # --- Constants for Model Names ---
57
  # WHISPER_MODEL_NAME = "openai/whisper-large-v3"
58
- # CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT"
59
- # CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # For SpaceTimeGPT's video encoder
60
  # # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor)
61
  # ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics"
62
  # ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME)
@@ -88,7 +97,7 @@ def _login_to_hf():
88
 
89
  # === 1. Transcription with Whisper ===
90
  @app.function(
91
- image=video_analysis_image,
92
  secrets=[HF_TOKEN_SECRET],
93
  gpu="any",
94
  timeout=600
@@ -142,7 +151,14 @@ def transcribe_video_with_whisper(video_bytes: bytes) -> str:
142
  device="cuda:0" if torch.cuda.is_available() else "cpu",
143
  )
144
  print(f"[Whisper] Pipeline loaded. Transcribing {temp_audio_path}...")
145
- outputs = pipe(temp_audio_path, chunk_length_s=30, batch_size=8, return_timestamps=False)
 
 
 
 
 
 
 
146
  transcription = outputs["text"]
147
  print(f"[Whisper] Transcription successful: {transcription[:100]}...")
148
  return transcription
@@ -159,7 +175,7 @@ def transcribe_video_with_whisper(video_bytes: bytes) -> str:
159
 
160
  # === 2. Captioning with SpaceTimeGPT ===
161
  @app.function(
162
- image=video_analysis_image,
163
  secrets=[HF_TOKEN_SECRET],
164
  gpu="any",
165
  timeout=600
@@ -167,7 +183,7 @@ def transcribe_video_with_whisper(video_bytes: bytes) -> str:
167
  def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str:
168
  _login_to_hf()
169
  import torch
170
- from transformers import AutoProcessor, AutoModelForCausalLM
171
  import av
172
  import numpy as np
173
  import tempfile
@@ -191,14 +207,20 @@ def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str:
191
  indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
192
  frames = []
193
  for i in indices:
194
- container.seek(i, stream=video_stream)
195
  frame = next(container.decode(video_stream))
196
  frames.append(frame.to_rgb().to_ndarray())
197
  container.close()
198
  video_frames_np = np.stack(frames)
199
 
200
  processor = AutoProcessor.from_pretrained(CAPTION_PROCESSOR_NAME, trust_remote_code=True)
201
- model = AutoModelForCausalLM.from_pretrained(CAPTION_MODEL_NAME, trust_remote_code=True)
 
 
 
 
 
 
202
  device = "cuda:0" if torch.cuda.is_available() else "cpu"
203
  model.to(device)
204
  if hasattr(processor, 'tokenizer'): # Check if tokenizer exists
@@ -224,7 +246,7 @@ def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str:
224
 
225
  # === 3. Action Recognition with VideoMAE ===
226
  @app.function(
227
- image=video_analysis_image,
228
  secrets=[HF_TOKEN_SECRET],
229
  gpu="any",
230
  timeout=600
@@ -256,7 +278,7 @@ def generate_action_labels(video_bytes: bytes) -> List[Dict[str, Any]]:
256
  indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
257
  video_frames_list = []
258
  for i in indices:
259
- container.seek(i, stream=video_stream)
260
  frame = next(container.decode(video_stream))
261
  video_frames_list.append(frame.to_rgb().to_ndarray())
262
  container.close()
@@ -297,7 +319,7 @@ def generate_action_labels(video_bytes: bytes) -> List[Dict[str, Any]]:
297
 
298
  # === 4. Object Detection with DETR ===
299
  @app.function(
300
- image=video_analysis_image,
301
  secrets=[HF_TOKEN_SECRET],
302
  gpu="any",
303
  timeout=600
@@ -337,7 +359,7 @@ def generate_object_detection(video_bytes: bytes) -> List[Dict[str, Any]]:
337
 
338
  all_frame_detections = []
339
  for frame_num, target_frame_index in enumerate(frame_indices):
340
- container.seek(target_frame_index, stream=video_stream)
341
  frame = next(container.decode(video_stream))
342
  pil_image = frame.to_image()
343
 
@@ -376,7 +398,7 @@ def generate_object_detection(video_bytes: bytes) -> List[Dict[str, Any]]:
376
 
377
  # === 5. Comprehensive Video Analysis (Orchestrator) ===
378
  @app.function(
379
- image=video_analysis_image,
380
  secrets=[HF_TOKEN_SECRET],
381
  gpu="any", # Request GPU as some sub-tasks will need it
382
  timeout=1800, # Generous timeout for all models
@@ -388,7 +410,7 @@ async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]:
388
  cache_key = hashlib.sha256(video_bytes).hexdigest()
389
 
390
  try:
391
- cached_result = await video_analysis_cache.get(cache_key)
392
  if cached_result:
393
  print(f"[Orchestrator] Cache hit for key: {cache_key}")
394
  return cached_result
@@ -402,35 +424,35 @@ async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]:
402
  print("[Orchestrator] Calling transcription...")
403
  try:
404
  # .call() is synchronous in the context of the Modal function execution
405
- results["transcription"] = transcribe_video_with_whisper.call(video_bytes)
406
  except Exception as e:
407
  print(f"[Orchestrator] Error in transcription: {e}")
408
  results["transcription"] = f"Transcription Error: {str(e)}"
409
 
410
  print("[Orchestrator] Calling captioning...")
411
  try:
412
- results["caption"] = generate_captions_with_spacetimegpt.call(video_bytes)
413
  except Exception as e:
414
  print(f"[Orchestrator] Error in captioning: {e}")
415
  results["caption"] = f"Captioning Error: {str(e)}"
416
 
417
  print("[Orchestrator] Calling action recognition...")
418
  try:
419
- results["actions"] = generate_action_labels.call(video_bytes)
420
  except Exception as e:
421
  print(f"[Orchestrator] Error in action recognition: {e}")
422
  results["actions"] = [{"error": f"Action Recognition Error: {str(e)}"}] # Ensure list type for error
423
 
424
  print("[Orchestrator] Calling object detection...")
425
  try:
426
- results["objects"] = generate_object_detection.call(video_bytes)
427
  except Exception as e:
428
  print(f"[Orchestrator] Error in object detection: {e}")
429
  results["objects"] = [{"error": f"Object Detection Error: {str(e)}"}] # Ensure list type for error
430
 
431
  print("[Orchestrator] All analyses attempted. Storing results in cache.")
432
  try:
433
- await video_analysis_cache.put(cache_key, results)
434
  print(f"[Orchestrator] Successfully cached results for key: {cache_key}")
435
  except Exception as e:
436
  print(f"[Orchestrator] Cache PUT error: {e}")
@@ -439,13 +461,7 @@ async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]:
439
 
440
 
441
  # === FastAPI Endpoint for Video Analysis ===
442
- @app.function(
443
- image=video_analysis_image,
444
- secrets=[HF_TOKEN_SECRET],
445
- gpu="any",
446
- timeout=1800,
447
- )
448
- @modal.fastapi_endpoint(method="POST")
449
  def process_video_analysis(payload: VideoAnalysisRequestPayload):
450
  """FastAPI endpoint for comprehensive video analysis."""
451
  print(f"[FastAPI Endpoint] Received request for video analysis")
@@ -456,18 +472,138 @@ def process_video_analysis(payload: VideoAnalysisRequestPayload):
456
 
457
  print(f"[FastAPI Endpoint] Processing video_url: {video_url}")
458
  try:
459
- # Download video
460
- import httpx
461
- with httpx.Client() as client:
462
- response = client.get(video_url, follow_redirects=True, timeout=60.0)
463
- response.raise_for_status()
464
- video_bytes = response.content
465
- if not video_bytes:
466
- return JSONResponse(status_code=400, content={"error": f"Failed to download video from URL: {video_url}. Content was empty."})
467
- print(f"[FastAPI Endpoint] Successfully downloaded {len(video_bytes)} bytes from {video_url}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
468
 
469
  # Call comprehensive analysis
470
- analysis_results = analyze_video_comprehensive.call(video_bytes)
471
  print("[FastAPI Endpoint] Comprehensive analysis finished.")
472
  return JSONResponse(status_code=200, content=analysis_results)
473
 
@@ -478,76 +614,105 @@ def process_video_analysis(payload: VideoAnalysisRequestPayload):
478
  print(f"[FastAPI Endpoint] Unexpected Exception during analysis: {e}")
479
  return JSONResponse(status_code=500, content={"error": f"Unexpected server error during analysis: {str(e)}"})
480
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
481
  # === 6. Topic-Based Video Search ===
482
  @app.function(
483
- image=video_analysis_image,
484
  secrets=[HF_TOKEN_SECRET],
485
  timeout=300
486
  )
487
  def find_video_urls_for_topic(topic: str, max_results: int = 3) -> List[str]:
488
- """Finds video URLs (YouTube, direct links) for a given topic using web search."""
489
  print(f"[TopicSearch] Finding video URLs for topic: '{topic}', max_results={max_results}")
490
-
491
- # This import is inside because search_web is a tool available to Cascade, not directly to Modal runtime
492
- # This function will be called via .remote() and its implementation will be provided by Cascade's tool execution
493
- # For now, this is a placeholder for where the search_web tool would be invoked.
494
- # In a real Modal execution, this function would need to use a library like 'requests' and 'beautifulsoup'
495
- # or a dedicated search API (e.g., SerpApi, Google Search API) if called from within Modal directly.
496
- # Since Cascade calls this, it will use its 'search_web' tool.
497
-
498
- # Simulate search results for now, as direct tool call from Modal code isn't standard.
499
- # When Cascade calls this, it should intercept and use its search_web tool.
500
- # For local testing or direct Modal runs, this would need a real search implementation.
501
-
502
- # Placeholder: In a real scenario, this function would use a search tool/API.
503
- # For the purpose of this exercise, we'll assume Cascade's `search_web` tool will be used
504
- # when this function is invoked through Cascade's orchestration.
505
- # If running this Modal app standalone, this part needs a concrete implementation.
506
-
507
- # Example of what the logic would look like if we had search results:
508
- # query = f"{topic} video youtube OR .mp4 OR .mov"
509
- # search_results = [] # This would be populated by a search_web call
510
-
511
- # For demonstration, let's return some dummy URLs. Replace with actual search logic.
512
- # print(f"[TopicSearch] This is a placeholder. Actual search via Cascade's 'search_web' tool is expected.")
513
- # print(f"[TopicSearch] If running standalone, implement search logic here.")
514
-
515
- # The actual implementation will be handled by Cascade's search_web tool call
516
- # when this function is called via .remote() by another function that Cascade is orchestrating.
517
- # This function definition serves as a Modal-compatible stub for Cascade's tool.
518
-
519
- # This function is more of a declaration for Cascade to use its tool.
520
- # The actual search logic will be implicitly handled by Cascade's tool call mechanism
521
- # when `find_video_urls_for_topic.remote()` is used in a subsequent step orchestrated by Cascade.
522
-
523
- # If this function were to be *truly* self-contained within Modal and callable independently
524
- # *without* Cascade's direct tool invocation, it would need its own HTTP client and parsing logic here.
525
- # However, given the context of Cascade's operation, this stub is appropriate for Cascade to inject its tool usage.
526
-
527
- # The `search_web` tool will be called by Cascade when it orchestrates the call to this function.
528
- # So, this Python function in `modal_whisper_app.py` mostly defines the signature and intent.
529
- # We will rely on Cascade to make the actual search_web call and provide the results back to the orchestrator.
530
-
531
- # This function, when called by Cascade, will trigger a `search_web` tool call.
532
- # The tool call will be made by Cascade, not by the Modal runtime directly.
533
- # For now, let's assume this function's body is a placeholder for that interaction.
534
- # The key is that the *calling* function (e.g., analyze_videos_by_topic) will use .remote(),
535
- # and Cascade will manage the search_web tool call.
536
-
537
- # To make this runnable standalone (for testing Modal part without Cascade), one might add:
538
- # if modal.is_local():
539
- # # basic requests/bs4 search or return dummy data
540
- # pass
541
-
542
- # For the flow with Cascade, this function primarily serves as a named Modal function
543
- # that Cascade understands it needs to provide search results for.
544
- # The actual search logic is deferred to Cascade's tool execution.
545
- # We will return an empty list here, expecting Cascade to populate it via its mechanisms when called.
546
- print(f"[TopicSearch] Function '{find_video_urls_for_topic.__name__}' called. Expecting Cascade to perform web search.")
547
- # This is a conceptual placeholder. The actual search will be done by Cascade's tool.
548
- # When `analyze_videos_by_topic` calls `find_video_urls_for_topic.remote()`,
549
- # Cascade will execute its `search_web` tool and the result will be used.
550
- return [] # Placeholder: Cascade will provide actual URLs via its search_web tool.
551
 
552
  # Helper function (not a Modal function) to extract video URLs from search results
553
  def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_urls: int = 3) -> List[str]:
@@ -588,25 +753,15 @@ def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_url
588
  break
589
  if len(video_urls) >= max_urls:
590
  break
591
-
592
- print(f"[URL Extraction] Extracted {len(video_urls)} video URLs: {video_urls}")
593
- return video_urls
594
-
595
-
596
  # === 7. Topic-Based Video Analysis Orchestrator ===
597
  @app.function(
598
- image=video_analysis_image,
599
  secrets=[HF_TOKEN_SECRET],
600
- gpu="any", # Child functions use GPU
601
- timeout=3600 # Allow up to 1 hour for multiple video analyses
602
  )
603
- async def _download_and_analyze_one_video(client: httpx.AsyncClient, video_url: str, topic: str) -> Dict[str, Any]:
604
- """Helper to download and analyze a single video. Returns result or error dict."""
605
- print(f"[TopicAnalysisWorker] Processing video URL for topic '{topic}': {video_url}")
606
- try:
607
- # 1. Download video
608
- print(f"[TopicAnalysisWorker] Downloading video from: {video_url}")
609
- response = await client.get(video_url)
610
  response.raise_for_status() # Raise HTTPError for bad responses (4XX or 5XX)
611
  video_bytes = await response.aread()
612
  print(f"[TopicAnalysisWorker] Downloaded {len(video_bytes)} bytes from {video_url}")
 
1
  import modal
2
  from fastapi import FastAPI, UploadFile, File, Body, Query
3
+ from fastapi.responses import JSONResponse
4
+
5
+ web_app = FastAPI(title="MCP Video Analysis API")
6
+
7
  import os
8
  import tempfile
9
  import io # Used by Whisper for BytesIO
 
14
  from fastapi.responses import JSONResponse
15
  from fastapi.middleware.cors import CORSMiddleware
16
  from pydantic import BaseModel
17
+ import re # For parsing search results
18
+ import yt_dlp
19
  import asyncio # For concurrent video processing
20
 
21
  import gradio as gr
 
31
  OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50"
32
 
33
  # --- Modal Image Definition ---
34
+ video_analysis_image_v2 = (
35
  modal.Image.debian_slim(python_version="3.10")
36
  .apt_install("ffmpeg")
37
  .pip_install(
 
44
  "torchvision",
45
  "torchaudio",
46
  "fastapi[standard]", # For web endpoints
47
+ "pydantic",
48
+ "yt-dlp", # For request body validation
49
+ "httpx", # For downloading video from URL
50
+ "cowsay==6.1" # Cache-busting package
51
  )
52
  )
53
 
 
56
 
57
  # --- Pydantic model for web endpoint request ---
58
  class VideoAnalysisRequestPayload(BaseModel):
59
+ video_url: Optional[str] = None
60
+
61
+ class TopicAnalysisRequest(BaseModel):
62
+ topic: str
63
+ max_videos: int = Query(3, ge=1, le=10) # Default 3, min 1, max 10 videos
64
 
65
  # --- Constants for Model Names ---
66
  # WHISPER_MODEL_NAME = "openai/whisper-large-v3"
67
+ CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT"
68
+ CAPTION_PROCESSOR_NAME = "Neleac/SpaceTimeGPT" # Use processor from SpaceTimeGPT itself
69
  # # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor)
70
  # ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics"
71
  # ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME)
 
97
 
98
  # === 1. Transcription with Whisper ===
99
  @app.function(
100
+ image=video_analysis_image_v2,
101
  secrets=[HF_TOKEN_SECRET],
102
  gpu="any",
103
  timeout=600
 
151
  device="cuda:0" if torch.cuda.is_available() else "cpu",
152
  )
153
  print(f"[Whisper] Pipeline loaded. Transcribing {temp_audio_path}...")
154
+ # Add robust error handling for the Whisper model
155
+ try:
156
+ outputs = pipe(temp_audio_path, chunk_length_s=30, stride_length_s=5, batch_size=8, generate_kwargs={"language": "english"}, return_timestamps=False)
157
+ except Exception as whisper_err:
158
+ print(f"[Whisper] Error during transcription: {whisper_err}")
159
+ # Try again with different settings if the first attempt failed
160
+ print(f"[Whisper] Attempting fallback transcription with smaller chunk size...")
161
+ outputs = pipe(temp_audio_path, chunk_length_s=10, stride_length_s=2, batch_size=4, generate_kwargs={"language": "english"}, return_timestamps=False)
162
  transcription = outputs["text"]
163
  print(f"[Whisper] Transcription successful: {transcription[:100]}...")
164
  return transcription
 
175
 
176
  # === 2. Captioning with SpaceTimeGPT ===
177
  @app.function(
178
+ image=video_analysis_image_v2,
179
  secrets=[HF_TOKEN_SECRET],
180
  gpu="any",
181
  timeout=600
 
183
  def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str:
184
  _login_to_hf()
185
  import torch
186
+ from transformers import AutoProcessor, AutoModelForVision2Seq
187
  import av
188
  import numpy as np
189
  import tempfile
 
207
  indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
208
  frames = []
209
  for i in indices:
210
+ container.seek(int(i), stream=video_stream)
211
  frame = next(container.decode(video_stream))
212
  frames.append(frame.to_rgb().to_ndarray())
213
  container.close()
214
  video_frames_np = np.stack(frames)
215
 
216
  processor = AutoProcessor.from_pretrained(CAPTION_PROCESSOR_NAME, trust_remote_code=True)
217
+
218
+ # Debug prints
219
+ print(f"[SpaceTimeGPT] DEBUG: CAPTION_MODEL_NAME is {CAPTION_MODEL_NAME}")
220
+ print(f"[SpaceTimeGPT] DEBUG: Intending to use model class: {AutoModelForVision2Seq.__name__}")
221
+ print(f"[SpaceTimeGPT] DEBUG: Type of model class object: {type(AutoModelForVision2Seq)}")
222
+
223
+ model = AutoModelForVision2Seq.from_pretrained(CAPTION_MODEL_NAME, trust_remote_code=True)
224
  device = "cuda:0" if torch.cuda.is_available() else "cpu"
225
  model.to(device)
226
  if hasattr(processor, 'tokenizer'): # Check if tokenizer exists
 
246
 
247
  # === 3. Action Recognition with VideoMAE ===
248
  @app.function(
249
+ image=video_analysis_image_v2,
250
  secrets=[HF_TOKEN_SECRET],
251
  gpu="any",
252
  timeout=600
 
278
  indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
279
  video_frames_list = []
280
  for i in indices:
281
+ container.seek(int(i), stream=video_stream)
282
  frame = next(container.decode(video_stream))
283
  video_frames_list.append(frame.to_rgb().to_ndarray())
284
  container.close()
 
319
 
320
  # === 4. Object Detection with DETR ===
321
  @app.function(
322
+ image=video_analysis_image_v2,
323
  secrets=[HF_TOKEN_SECRET],
324
  gpu="any",
325
  timeout=600
 
359
 
360
  all_frame_detections = []
361
  for frame_num, target_frame_index in enumerate(frame_indices):
362
+ container.seek(int(target_frame_index), stream=video_stream)
363
  frame = next(container.decode(video_stream))
364
  pil_image = frame.to_image()
365
 
 
398
 
399
  # === 5. Comprehensive Video Analysis (Orchestrator) ===
400
  @app.function(
401
+ image=video_analysis_image_v2,
402
  secrets=[HF_TOKEN_SECRET],
403
  gpu="any", # Request GPU as some sub-tasks will need it
404
  timeout=1800, # Generous timeout for all models
 
410
  cache_key = hashlib.sha256(video_bytes).hexdigest()
411
 
412
  try:
413
+ cached_result = video_analysis_cache.get(cache_key)
414
  if cached_result:
415
  print(f"[Orchestrator] Cache hit for key: {cache_key}")
416
  return cached_result
 
424
  print("[Orchestrator] Calling transcription...")
425
  try:
426
  # .call() is synchronous in the context of the Modal function execution
427
+ results["transcription"] = transcribe_video_with_whisper.remote(video_bytes)
428
  except Exception as e:
429
  print(f"[Orchestrator] Error in transcription: {e}")
430
  results["transcription"] = f"Transcription Error: {str(e)}"
431
 
432
  print("[Orchestrator] Calling captioning...")
433
  try:
434
+ results["caption"] = generate_captions_with_spacetimegpt.remote(video_bytes)
435
  except Exception as e:
436
  print(f"[Orchestrator] Error in captioning: {e}")
437
  results["caption"] = f"Captioning Error: {str(e)}"
438
 
439
  print("[Orchestrator] Calling action recognition...")
440
  try:
441
+ results["actions"] = generate_action_labels.remote(video_bytes)
442
  except Exception as e:
443
  print(f"[Orchestrator] Error in action recognition: {e}")
444
  results["actions"] = [{"error": f"Action Recognition Error: {str(e)}"}] # Ensure list type for error
445
 
446
  print("[Orchestrator] Calling object detection...")
447
  try:
448
+ results["objects"] = generate_object_detection.remote(video_bytes)
449
  except Exception as e:
450
  print(f"[Orchestrator] Error in object detection: {e}")
451
  results["objects"] = [{"error": f"Object Detection Error: {str(e)}"}] # Ensure list type for error
452
 
453
  print("[Orchestrator] All analyses attempted. Storing results in cache.")
454
  try:
455
+ video_analysis_cache.put(cache_key, results)
456
  print(f"[Orchestrator] Successfully cached results for key: {cache_key}")
457
  except Exception as e:
458
  print(f"[Orchestrator] Cache PUT error: {e}")
 
461
 
462
 
463
  # === FastAPI Endpoint for Video Analysis ===
464
+ @web_app.post("/process_video_analysis")
 
 
 
 
 
 
465
  def process_video_analysis(payload: VideoAnalysisRequestPayload):
466
  """FastAPI endpoint for comprehensive video analysis."""
467
  print(f"[FastAPI Endpoint] Received request for video analysis")
 
472
 
473
  print(f"[FastAPI Endpoint] Processing video_url: {video_url}")
474
  try:
475
+ # Download video using yt-dlp with enhanced options for robustness
476
+ import yt_dlp
477
+ import tempfile
478
+ import os
479
+ import subprocess
480
+ import shutil
481
+
482
+ video_bytes = None
483
+ with tempfile.TemporaryDirectory() as tmpdir:
484
+ output_base = os.path.join(tmpdir, 'video')
485
+ output_path = output_base + '.mp4'
486
+
487
+ # Enhanced yt-dlp options for more reliable downloads
488
+ ydl_opts = {
489
+ # Request specific formats in priority order
490
+ 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
491
+ 'outtmpl': output_base,
492
+ 'quiet': False, # Temporarily enable output for debugging
493
+ 'verbose': True, # More verbose output to diagnose issues
494
+ 'no_warnings': False, # Show warnings for debugging
495
+ 'noplaylist': True,
496
+ # Force remux to ensure valid container
497
+ 'merge_output_format': 'mp4',
498
+ # Add postprocessors to ensure valid MP4
499
+ 'postprocessors': [{
500
+ 'key': 'FFmpegVideoConvertor',
501
+ 'preferedformat': 'mp4',
502
+ 'postprocessor_args': ['-movflags', '+faststart'],
503
+ }],
504
+ # Force ffmpeg to create a valid MP4 with moov atom at the beginning
505
+ 'prefer_ffmpeg': True,
506
+ 'http_headers': {
507
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
508
+ },
509
+ }
510
+
511
+ try:
512
+ print(f"[FastAPI Endpoint] Downloading video with enhanced yt-dlp options from {video_url}")
513
+ download_success = False
514
+
515
+ # Try yt-dlp first
516
+ try:
517
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
518
+ ydl.download([video_url])
519
+
520
+ # Find the actual output file (might have a different extension)
521
+ downloaded_files = [f for f in os.listdir(tmpdir) if f.startswith('video')]
522
+ if downloaded_files:
523
+ actual_file = os.path.join(tmpdir, downloaded_files[0])
524
+ print(f"[FastAPI Endpoint] Found downloaded file: {actual_file}")
525
+ download_success = True
526
+ except Exception as e:
527
+ print(f"[FastAPI Endpoint] yt-dlp download failed: {e}. Trying direct download...")
528
+
529
+ # Fallback to direct download if it's a direct video URL
530
+ if not download_success and (video_url.endswith('.mp4') or 'commondatastorage.googleapis.com' in video_url):
531
+ import requests
532
+ try:
533
+ print(f"[FastAPI Endpoint] Attempting direct download for {video_url}")
534
+ actual_file = os.path.join(tmpdir, 'direct_video.mp4')
535
+ with requests.get(video_url, stream=True) as r:
536
+ r.raise_for_status()
537
+ with open(actual_file, 'wb') as f:
538
+ for chunk in r.iter_content(chunk_size=8192):
539
+ f.write(chunk)
540
+ print(f"[FastAPI Endpoint] Direct download successful: {actual_file}")
541
+ download_success = True
542
+ except Exception as e:
543
+ print(f"[FastAPI Endpoint] Direct download failed: {e}")
544
+
545
+ # For testing: Try a sample video if all downloads failed (Big Buck Bunny)
546
+ if not download_success:
547
+ test_url = "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"
548
+ print(f"[FastAPI Endpoint] All downloads failed. Falling back to sample video: {test_url}")
549
+ import requests
550
+ try:
551
+ actual_file = os.path.join(tmpdir, 'fallback_video.mp4')
552
+ with requests.get(test_url, stream=True) as r:
553
+ r.raise_for_status()
554
+ with open(actual_file, 'wb') as f:
555
+ for chunk in r.iter_content(chunk_size=8192):
556
+ f.write(chunk)
557
+ print(f"[FastAPI Endpoint] Fallback download successful")
558
+ download_success = True
559
+ except Exception as e:
560
+ print(f"[FastAPI Endpoint] Even fallback download failed: {e}")
561
+ raise Exception("All download methods failed")
562
+
563
+ # Ensure it's a properly formatted MP4 using ffmpeg directly
564
+ final_output = os.path.join(tmpdir, 'final_video.mp4')
565
+ try:
566
+ # Use ffmpeg to re-encode the file, ensuring proper moov atom placement
567
+ print(f"[FastAPI Endpoint] Reprocessing with ffmpeg to ensure valid MP4 format")
568
+ subprocess.run(
569
+ ["ffmpeg", "-i", actual_file, "-c:v", "copy", "-c:a", "copy", "-movflags", "faststart", final_output],
570
+ check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
571
+ )
572
+
573
+ if os.path.exists(final_output) and os.path.getsize(final_output) > 0:
574
+ with open(final_output, 'rb') as f:
575
+ video_bytes = f.read()
576
+ print(f"[FastAPI Endpoint] Successfully reprocessed video, size: {len(video_bytes)} bytes")
577
+ else:
578
+ print(f"[FastAPI Endpoint] ffmpeg reprocessing failed to produce valid output")
579
+ except subprocess.SubprocessError as se:
580
+ print(f"[FastAPI Endpoint] ffmpeg reprocessing failed: {se}")
581
+ # If ffmpeg fails, try with the original file
582
+ if os.path.exists(actual_file) and os.path.getsize(actual_file) > 0:
583
+ with open(actual_file, 'rb') as f:
584
+ video_bytes = f.read()
585
+ print(f"[FastAPI Endpoint] Using original download, size: {len(video_bytes)} bytes")
586
+ else:
587
+ print(f"[FastAPI Endpoint] No downloaded files found in directory: {os.listdir(tmpdir)}")
588
+ except yt_dlp.utils.DownloadError:
589
+ # Fallback to httpx for direct links if yt-dlp fails
590
+ print(f"[FastAPI Endpoint] yt-dlp failed, falling back to httpx for {video_url}")
591
+ try:
592
+ import httpx
593
+ with httpx.Client() as client:
594
+ response = client.get(video_url, follow_redirects=True, timeout=60.0)
595
+ response.raise_for_status()
596
+ video_bytes = response.content
597
+ except httpx.RequestError as he:
598
+ return JSONResponse(status_code=400, content={"error": f"Failed to download video from URL using both yt-dlp and httpx. Details: {he}"})
599
+
600
+ if not video_bytes:
601
+ return JSONResponse(status_code=400, content={"error": f"Downloaded video from URL {video_url} is empty or download failed."})
602
+
603
+ print(f"[FastAPI Endpoint] Successfully downloaded and validated {len(video_bytes)} bytes from {video_url} using enhanced downloader.")
604
 
605
  # Call comprehensive analysis
606
+ analysis_results = analyze_video_comprehensive.remote(video_bytes)
607
  print("[FastAPI Endpoint] Comprehensive analysis finished.")
608
  return JSONResponse(status_code=200, content=analysis_results)
609
 
 
614
  print(f"[FastAPI Endpoint] Unexpected Exception during analysis: {e}")
615
  return JSONResponse(status_code=500, content={"error": f"Unexpected server error during analysis: {str(e)}"})
616
 
617
+ # === FastAPI Endpoint for Topic Analysis ===
618
+ @web_app.post("/analyze_topic")
619
+ async def handle_analyze_topic_request(request: TopicAnalysisRequest):
620
+ """
621
+ Handles a request to analyze videos based on a topic.
622
+ 1. Finds video URLs for the topic using YouTube search.
623
+ 2. Concurrently analyzes these videos.
624
+ 3. Returns aggregated results.
625
+ """
626
+ print(f"[TopicAPI] Received request to analyze topic: '{request.topic}', max_videos: {request.max_videos}")
627
+
628
+ try:
629
+ # Use .aio for async call if the Modal function is async, or just .remote if it's sync
630
+ # Assuming find_video_urls_for_topic is sync as defined, but can be called with .remote()
631
+ # If find_video_urls_for_topic itself becomes async, then .remote.aio() is appropriate.
632
+ # For now, let's assume it's called as a standard remote Modal function.
633
+ video_urls = await find_video_urls_for_topic.remote.aio(request.topic, request.max_videos)
634
+
635
+ if not video_urls:
636
+ print(f"[TopicAPI] No video URLs found for topic: '{request.topic}'")
637
+ return JSONResponse(
638
+ status_code=404,
639
+ content={
640
+ "status": "error",
641
+ "message": "No videos found for the specified topic.",
642
+ "topic": request.topic,
643
+ "details": "The YouTube search did not return any relevant video URLs."
644
+ }
645
+ )
646
+
647
+ print(f"[TopicAPI] Found {len(video_urls)} URLs for topic '{request.topic}', proceeding to analysis.")
648
+
649
+ # analyze_videos_by_topic is an async Modal function, so use .remote.aio()
650
+ analysis_results = await analyze_videos_by_topic.remote.aio(video_urls, request.topic)
651
+
652
+ print(f"[TopicAPI] Successfully analyzed videos for topic: '{request.topic}'")
653
+ return analysis_results
654
+
655
+ except Exception as e:
656
+ print(f"[TopicAPI] Error during topic analysis for '{request.topic}': {e}")
657
+ import traceback
658
+ traceback.print_exc()
659
+ return JSONResponse(
660
+ status_code=500,
661
+ content={
662
+ "status": "error",
663
+ "message": "An internal server error occurred during topic analysis.",
664
+ "topic": request.topic,
665
+ "error_details_str": str(e) # Keep it simple for JSON
666
+ }
667
+ )
668
+
669
  # === 6. Topic-Based Video Search ===
670
  @app.function(
671
+ image=video_analysis_image_v2,
672
  secrets=[HF_TOKEN_SECRET],
673
  timeout=300
674
  )
675
  def find_video_urls_for_topic(topic: str, max_results: int = 3) -> List[str]:
676
+ """Finds video URLs (YouTube) for a given topic using yt-dlp."""
677
  print(f"[TopicSearch] Finding video URLs for topic: '{topic}', max_results={max_results}")
678
+ video_urls = []
679
+ try:
680
+ # Add a common user-agent to avoid getting blocked
681
+ # Let yt-dlp find ffmpeg in the PATH instead of hardcoding it
682
+ ydl_opts = {
683
+ 'quiet': True,
684
+ 'extract_flat': 'discard_in_playlist',
685
+ 'force_generic_extractor': False,
686
+ 'default_search': f"ytsearch{max_results}",
687
+ 'noplaylist': True,
688
+ 'prefer_ffmpeg': True,
689
+ 'http_headers': {
690
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
691
+ }
692
+ }
693
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
694
+ # extract_info with a search query like 'ytsearchN:query' returns a playlist dictionary
695
+ search_result = ydl.extract_info(topic, download=False)
696
+ if search_result and 'entries' in search_result:
697
+ for entry in search_result['entries']:
698
+ # Ensure entry is a dictionary and has 'webpage_url'
699
+ if isinstance(entry, dict) and entry.get('webpage_url'):
700
+ video_urls.append(entry['webpage_url'])
701
+ # yt-dlp search might return more than max_results, so we cap it here
702
+ if len(video_urls) >= max_results:
703
+ break
704
+ # Sometimes a single video result might not be in 'entries'
705
+ elif isinstance(search_result, dict) and search_result.get('webpage_url'):
706
+ video_urls.append(search_result['webpage_url'])
707
+
708
+ # Ensure we don't exceed max_results if the loop didn't break early enough
709
+ video_urls = video_urls[:max_results]
710
+ print(f"[TopicSearch] Found {len(video_urls)} video URLs for topic '{topic}': {video_urls}")
711
+ except Exception as e:
712
+ print(f"[TopicSearch] Error finding videos for topic '{topic}': {e}")
713
+ import traceback
714
+ traceback.print_exc()
715
+ return video_urls
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
716
 
717
  # Helper function (not a Modal function) to extract video URLs from search results
718
  def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_urls: int = 3) -> List[str]:
 
753
  break
754
  if len(video_urls) >= max_urls:
755
  break
 
 
 
 
 
756
  # === 7. Topic-Based Video Analysis Orchestrator ===
757
  @app.function(
758
+ image=video_analysis_image_v2,
759
  secrets=[HF_TOKEN_SECRET],
760
+ gpu="any",
761
+ timeout=3600
762
  )
763
+ async def analyze_videos_by_topic(video_urls: list, topic: str) -> dict:
764
+ # Analyze videos concurrently
 
 
 
 
 
765
  response.raise_for_status() # Raise HTTPError for bad responses (4XX or 5XX)
766
  video_bytes = await response.aread()
767
  print(f"[TopicAnalysisWorker] Downloaded {len(video_bytes)} bytes from {video_url}")