jomasego commited on
Commit
ee0209c
·
1 Parent(s): b279e29

Fix: Correct IndentationError and concurrency pattern in modal_whisper_app.py

Browse files
Files changed (1) hide show
  1. modal_whisper_app.py +51 -45
modal_whisper_app.py CHANGED
@@ -758,38 +758,53 @@ def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_url
758
  gpu="any",
759
  timeout=3600
760
  )
761
- async def analyze_videos_by_topic(video_urls: list, topic: str) -> dict:
762
- # Analyze videos concurrently
763
- response.raise_for_status() # Raise HTTPError for bad responses (4XX or 5XX)
764
- video_bytes = await response.aread()
765
- print(f"[TopicAnalysisWorker] Downloaded {len(video_bytes)} bytes from {video_url}")
 
 
 
 
 
 
 
 
 
 
 
 
 
766
 
767
- if not video_bytes:
768
- raise ValueError("Downloaded video content is empty.")
769
 
770
- # 2. Analyze video
771
- analysis_result = await analyze_video_comprehensive.coro(video_bytes)
772
-
773
- # Check if the analysis itself returned an error structure
774
- if isinstance(analysis_result, dict) and any(key + "_error" in analysis_result for key in ["transcription", "caption", "actions", "objects"]):
775
- print(f"[TopicAnalysisWorker] Comprehensive analysis for {video_url} reported errors: {analysis_result}")
776
- return {"url": video_url, "error_type": "analysis_error", "error_details": analysis_result}
777
- else:
778
- return {"url": video_url, "analysis": analysis_result}
779
 
780
  except httpx.HTTPStatusError as e:
781
- print(f"[TopicAnalysisWorker] HTTP error downloading {video_url}: {e}")
782
- return {"url": video_url, "error_type": "download_error", "error_details": f"HTTP {e.response.status_code}: {e.response.text[:200]}"}
783
  except httpx.RequestError as e:
784
- print(f"[TopicAnalysisWorker] Request error downloading {video_url}: {e}")
785
- return {"url": video_url, "error_type": "download_error", "error_details": f"Failed to download: {str(e)}"}
786
  except Exception as e:
787
- print(f"[TopicAnalysisWorker] Error processing video {video_url}: {e}")
788
  import traceback
789
- # Consider logging traceback.format_exc() instead of printing if running in a less verbose environment
790
- # traceback.print_exc() # This might be too verbose for regular Modal logs
791
- return {"url": video_url, "error_type": "processing_error", "error_details": str(e), "traceback": traceback.format_exc()[:1000]}
792
 
 
 
 
 
 
 
793
  async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str, Any]:
794
  """Analyzes a list of videos (by URL) concurrently and aggregates results for a topic."""
795
  print(f"[TopicAnalysis] Starting concurrent analysis for topic: '{topic}' with {len(video_urls)} video(s).")
@@ -804,29 +819,20 @@ async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str
804
  results_aggregator["errors"].append({"topic_error": "No video URLs provided or found for the topic."})
805
  return results_aggregator
806
 
807
- async with httpx.AsyncClient(timeout=300.0) as client: # 5 min timeout for individual downloads
808
- tasks = [_download_and_analyze_one_video(client, url, topic) for url in video_urls]
809
-
810
- # return_exceptions=True allows us to get results for successful tasks even if others fail
811
- individual_results = await asyncio.gather(*tasks, return_exceptions=True)
812
-
813
- for res_or_exc in individual_results:
814
- if isinstance(res_or_exc, Exception):
815
- # This handles exceptions not caught within _download_and_analyze_one_video itself (should be rare)
816
- # Or if return_exceptions=True was used and _download_and_analyze_one_video raised an unhandled one.
817
- print(f"[TopicAnalysis] An unexpected exception occurred during asyncio.gather: {res_or_exc}")
818
- results_aggregator["errors"].append({"url": "unknown_url_due_to_gather_exception", "processing_error": str(res_or_exc)})
819
- elif isinstance(res_or_exc, dict):
820
- if "error_type" in res_or_exc:
821
- results_aggregator["errors"].append(res_or_exc) # Append the error dict directly
822
- elif "analysis" in res_or_exc:
823
- results_aggregator["analyzed_videos"].append(res_or_exc)
824
  else:
825
- print(f"[TopicAnalysis] Received an unexpected dictionary structure: {res_or_exc}")
826
- results_aggregator["errors"].append({"url": res_or_exc.get("url", "unknown"), "processing_error": "Unknown result structure"})
827
  else:
828
- print(f"[TopicAnalysis] Received an unexpected result type from asyncio.gather: {type(res_or_exc)}")
829
- results_aggregator["errors"].append({"url": "unknown", "processing_error": f"Unexpected result type: {type(res_or_exc)}"})
 
830
 
831
  print(f"[TopicAnalysis] Finished concurrent analysis for topic '{topic}'.")
832
  return results_aggregator
 
758
  gpu="any",
759
  timeout=3600
760
  )
761
+ @app.function(
762
+ image=video_analysis_image_v2,
763
+ secrets=[HF_TOKEN_SECRET],
764
+ timeout=1800,
765
+ )
766
+ async def _analyze_video_worker(video_url: str) -> dict:
767
+ """
768
+ Worker function to download a video from a URL and run comprehensive analysis.
769
+ This is designed to be called concurrently.
770
+ """
771
+ print(f"[Worker] Starting analysis for {video_url}")
772
+ try:
773
+ async with httpx.AsyncClient() as client:
774
+ print(f"[Worker] Downloading video from {video_url}")
775
+ response = await client.get(video_url, follow_redirects=True, timeout=60.0)
776
+ response.raise_for_status()
777
+ video_bytes = await response.aread()
778
+ print(f"[Worker] Downloaded {len(video_bytes)} bytes from {video_url}")
779
 
780
+ if not video_bytes:
781
+ raise ValueError("Downloaded video content is empty.")
782
 
783
+ analysis_result = await analyze_video_comprehensive.coro(video_bytes)
784
+
785
+ if isinstance(analysis_result, dict) and any("error" in str(v).lower() for v in analysis_result.values()):
786
+ print(f"[Worker] Comprehensive analysis for {video_url} reported errors: {analysis_result}")
787
+ return {"url": video_url, "status": "error", "error_type": "analysis_error", "details": analysis_result}
788
+ else:
789
+ return {"url": video_url, "status": "success", "analysis": analysis_result}
 
 
790
 
791
  except httpx.HTTPStatusError as e:
792
+ print(f"[Worker] HTTP error downloading {video_url}: {e}")
793
+ return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"HTTP {e.response.status_code}"}
794
  except httpx.RequestError as e:
795
+ print(f"[Worker] Request error downloading {video_url}: {e}")
796
+ return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"Failed to download: {str(e)}"}
797
  except Exception as e:
798
+ print(f"[Worker] Error processing video {video_url}: {e}")
799
  import traceback
800
+ return {"url": video_url, "status": "error", "error_type": "processing_error", "details": str(e), "traceback": traceback.format_exc()[:1000]}
 
 
801
 
802
+ @app.function(
803
+ image=video_analysis_image_v2,
804
+ secrets=[HF_TOKEN_SECRET],
805
+ timeout=3600,
806
+ gpu="any",
807
+ )
808
  async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str, Any]:
809
  """Analyzes a list of videos (by URL) concurrently and aggregates results for a topic."""
810
  print(f"[TopicAnalysis] Starting concurrent analysis for topic: '{topic}' with {len(video_urls)} video(s).")
 
819
  results_aggregator["errors"].append({"topic_error": "No video URLs provided or found for the topic."})
820
  return results_aggregator
821
 
822
+ # Use .map to run the worker function concurrently on all video URLs
823
+ # The list() call forces the generator to execute and retrieve all results.
824
+ individual_results = list(_analyze_video_worker.map(video_urls))
825
+
826
+ for result in individual_results:
827
+ if isinstance(result, dict):
828
+ if result.get("status") == "error":
829
+ results_aggregator["errors"].append(result)
 
 
 
 
 
 
 
 
 
830
  else:
831
+ results_aggregator["analyzed_videos"].append(result)
 
832
  else:
833
+ # This case handles unexpected return types from the worker, like exceptions
834
+ print(f"[TopicAnalysis] Received an unexpected result type from worker: {type(result)}")
835
+ results_aggregator["errors"].append({"url": "unknown", "error_type": "unexpected_result", "details": str(result)})
836
 
837
  print(f"[TopicAnalysis] Finished concurrent analysis for topic '{topic}'.")
838
  return results_aggregator