Spaces:
Runtime error
Runtime error
# multimodal_module.py | |
import os | |
import pickle | |
import subprocess | |
import tempfile | |
import shutil | |
import asyncio | |
import logging | |
from datetime import datetime | |
from typing import Dict, List, Optional, Any, Union | |
import uuid | |
import numpy as np | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger("MultiModalModule") | |
# Space-specific environment configuration | |
os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1" | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
# Core ML Imports | |
import torch | |
from transformers import ( | |
pipeline, | |
AutoModelForSeq2SeqLM, | |
AutoTokenizer, | |
Wav2Vec2Processor, | |
Wav2Vec2ForSequenceClassification, | |
AutoModelForCausalLM | |
) | |
from diffusers import ( | |
StableDiffusionPipeline, | |
StableDiffusionInpaintPipeline | |
) | |
from huggingface_hub import hf_hub_download, snapshot_download | |
# Audio Processing | |
import librosa | |
import soundfile as sf | |
from gtts import gTTS | |
import speech_recognition as sr | |
import webrtcvad | |
# Image/Video Processing | |
from PIL import Image | |
import imageio | |
import imageio_ffmpeg | |
import moviepy.editor as mp | |
import cv2 | |
# Document Processing | |
import fitz # PyMuPDF | |
from langdetect import detect, DetectorFactory | |
DetectorFactory.seed = 0 | |
# Configuration | |
USE_SAFETY_CHECKER = False | |
MAX_HISTORY_LENGTH = 100 | |
TEMP_DIR = "tmp" | |
MODEL_CACHE_DIR = "model_cache" | |
class MultiModalChatModule: | |
"""Complete multimodal module optimized for Hugging Face Spaces""" | |
def __init__(self, chat_history_file: str = "chat_histories.pkl"): | |
"""Initialize with Space optimizations""" | |
# Create required directories | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
os.makedirs(MODEL_CACHE_DIR, exist_ok=True) | |
# Device configuration | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.torch_dtype = torch.float16 if "cuda" in self.device else torch.float32 | |
logger.info(f"Initialized on {self.device.upper()} with dtype {self.torch_dtype}") | |
# Model registry | |
self.model_names = { | |
"voice_emotion_processor": "facebook/hubert-large-ls960-ft", | |
"voice_emotion_model": "superb/hubert-base-superb-er", | |
"translation_model": "facebook/nllb-200-distilled-600M", | |
"chatbot_tokenizer": "facebook/blenderbot-400M-distill", | |
"chatbot_model": "facebook/blenderbot-400M-distill", | |
"image_captioner": "Salesforce/blip-image-captioning-base", | |
"sd_inpaint": "runwayml/stable-diffusion-inpainting", | |
"sd_text2img": "runwayml/stable-diffusion-v1-5", | |
"code_model": "bigcode/starcoder", | |
} | |
# Model placeholders | |
self._voice_processor = None | |
self._voice_emotion_model = None | |
self._translator = None | |
self._chat_tokenizer = None | |
self._chat_model = None | |
self._image_captioner = None | |
self._sd_pipe = None | |
self._sd_inpaint = None | |
self._code_tokenizer = None | |
self._code_model = None | |
# Helpers | |
self._sr_recognizer = sr.Recognizer() | |
self.vad = webrtcvad.Vad(3) | |
self.chat_history_file = chat_history_file | |
self.user_chat_histories = self._load_chat_histories() | |
# Load tracking | |
self._loaded = { | |
"voice": False, | |
"translation": False, | |
"chat": False, | |
"image_caption": False, | |
"sd": False, | |
"code": False, | |
} | |
# ---------------------- | |
# Core Utilities | |
# ---------------------- | |
def _tmp_path(self, suffix: str = "") -> str: | |
"""Generate space-compatible temp file path""" | |
path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}{suffix}") | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
return path | |
def _cleanup(self, *paths: str) -> None: | |
"""Safely remove files/directories""" | |
for path in paths: | |
try: | |
if path and os.path.exists(path): | |
if os.path.isfile(path): | |
os.remove(path) | |
elif os.path.isdir(path): | |
shutil.rmtree(path) | |
except Exception as e: | |
logger.warning(f"Cleanup failed for {path}: {e}") | |
def _load_chat_histories(self) -> Dict[int, List[dict]]: | |
"""Load chat histories from file""" | |
try: | |
with open(self.chat_history_file, "rb") as f: | |
return pickle.load(f) | |
except Exception as e: | |
logger.warning(f"Failed loading chat history: {e}") | |
return {} | |
def _save_chat_histories(self) -> None: | |
"""Persist chat histories to file""" | |
try: | |
with open(self.chat_history_file, "wb") as f: | |
pickle.dump(self.user_chat_histories, f) | |
except Exception as e: | |
logger.error(f"Failed saving chat history: {e}") | |
def _update_history(self, user_id: int, role: str, content: Any, lang: str = "en") -> None: | |
"""Update conversation history""" | |
if user_id not in self.user_chat_histories: | |
self.user_chat_histories[user_id] = [] | |
self.user_chat_histories[user_id].append({ | |
"timestamp": datetime.now().isoformat(), | |
"role": role, | |
"content": content, | |
"language": lang | |
}) | |
# Enforce max history length | |
self.user_chat_histories[user_id] = self.user_chat_histories[user_id][-MAX_HISTORY_LENGTH:] | |
self._save_chat_histories() | |
# ---------------------- | |
# Model Loading | |
# ---------------------- | |
def _load_voice_models(self) -> None: | |
"""Load voice processing models""" | |
if self._loaded["voice"]: | |
return | |
try: | |
logger.info("Loading voice models...") | |
self._voice_processor = Wav2Vec2Processor.from_pretrained( | |
self.model_names["voice_emotion_processor"], | |
cache_dir=MODEL_CACHE_DIR | |
) | |
self._voice_emotion_model = Wav2Vec2ForSequenceClassification.from_pretrained( | |
self.model_names["voice_emotion_model"], | |
cache_dir=MODEL_CACHE_DIR | |
).to(self.device) | |
self._loaded["voice"] = True | |
logger.info("Voice models loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed loading voice models: {e}") | |
def _load_translation(self) -> None: | |
"""Load translation pipeline""" | |
if self._loaded["translation"]: | |
return | |
try: | |
logger.info("Loading translation model...") | |
device = 0 if self.device == "cuda" else -1 | |
self._translator = pipeline( | |
"translation", | |
model=self.model_names["translation_model"], | |
device=device, | |
cache_dir=MODEL_CACHE_DIR | |
) | |
self._loaded["translation"] = True | |
logger.info("Translation model loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed loading translation model: {e}") | |
def _load_chatbot(self) -> None: | |
"""Load chatbot models""" | |
if self._loaded["chat"]: | |
return | |
try: | |
logger.info("Loading chatbot models...") | |
self._chat_tokenizer = AutoTokenizer.from_pretrained( | |
self.model_names["chatbot_tokenizer"], | |
cache_dir=MODEL_CACHE_DIR | |
) | |
self._chat_model = AutoModelForSeq2SeqLM.from_pretrained( | |
self.model_names["chatbot_model"], | |
cache_dir=MODEL_CACHE_DIR | |
).to(self.device) | |
self._loaded["chat"] = True | |
logger.info("Chatbot models loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed loading chatbot models: {e}") | |
def _load_image_captioner(self) -> None: | |
"""Load image captioning model""" | |
if self._loaded["image_caption"]: | |
return | |
try: | |
logger.info("Loading image captioner...") | |
device = 0 if self.device == "cuda" else -1 | |
self._image_captioner = pipeline( | |
"image-to-text", | |
model=self.model_names["image_captioner"], | |
device=device, | |
cache_dir=MODEL_CACHE_DIR | |
) | |
self._loaded["image_caption"] = True | |
logger.info("Image captioner loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed loading image captioner: {e}") | |
def _load_sd(self) -> None: | |
"""Load Stable Diffusion models""" | |
if self._loaded["sd"]: | |
return | |
try: | |
logger.info("Loading Stable Diffusion models...") | |
# Text-to-image | |
self._sd_pipe = StableDiffusionPipeline.from_pretrained( | |
self.model_names["sd_text2img"], | |
torch_dtype=self.torch_dtype, | |
safety_checker=None if not USE_SAFETY_CHECKER else None, | |
cache_dir=MODEL_CACHE_DIR | |
).to(self.device) | |
# Inpainting | |
self._sd_inpaint = StableDiffusionInpaintPipeline.from_pretrained( | |
self.model_names["sd_inpaint"], | |
torch_dtype=self.torch_dtype, | |
cache_dir=MODEL_CACHE_DIR | |
).to(self.device) | |
self._loaded["sd"] = True | |
logger.info("Stable Diffusion models loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed loading Stable Diffusion models: {e}") | |
self._sd_pipe = None | |
self._sd_inpaint = None | |
def _load_code_model(self) -> None: | |
"""Load code generation model""" | |
if self._loaded["code"]: | |
return | |
try: | |
logger.info("Loading code model...") | |
self._code_tokenizer = AutoTokenizer.from_pretrained( | |
self.model_names["code_model"], | |
cache_dir=MODEL_CACHE_DIR | |
) | |
self._code_model = AutoModelForCausalLM.from_pretrained( | |
self.model_names["code_model"], | |
cache_dir=MODEL_CACHE_DIR | |
).to(self.device) | |
self._loaded["code"] = True | |
logger.info("Code model loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed loading code model: {e}") | |
self._code_tokenizer = None | |
self._code_model = None | |
# ---------------------- | |
# Audio Processing | |
# ---------------------- | |
async def analyze_voice_emotion(self, audio_path: str) -> str: | |
"""Analyze emotion from voice audio""" | |
self._load_voice_models() | |
if not self._voice_processor or not self._voice_emotion_model: | |
return "unknown" | |
try: | |
speech, sr = librosa.load(audio_path, sr=16000) | |
inputs = self._voice_processor( | |
speech, | |
sampling_rate=sr, | |
return_tensors="pt", | |
padding=True | |
).to(self.device) | |
with torch.no_grad(): | |
logits = self._voice_emotion_model(**inputs).logits | |
emotions = { | |
0: "happy", 1: "sad", 2: "angry", | |
3: "fearful", 4: "calm", 5: "surprised" | |
} | |
return emotions.get(torch.argmax(logits).item(), "unknown") | |
except Exception as e: | |
logger.error(f"Voice emotion analysis failed: {e}") | |
return "error" | |
async def process_voice_message(self, voice_file, user_id: int) -> Dict[str, Any]: | |
"""Process voice message to text with emotion analysis""" | |
ogg_path = self._tmp_path(".ogg") | |
wav_path = self._tmp_path(".wav") | |
try: | |
# Save and convert audio | |
await voice_file.download_to_drive(ogg_path) | |
# Convert to WAV | |
ffmpeg_path = imageio_ffmpeg.get_ffmpeg_exe() | |
cmd = [ | |
ffmpeg_path, "-y", "-i", ogg_path, | |
"-ar", "16000", "-ac", "1", wav_path | |
] | |
subprocess.run(cmd, check=True, capture_output=True) | |
# Analyze audio | |
speech, sr = librosa.load(wav_path, sr=16000) | |
# Voice Activity Detection | |
is_speech = self.vad.is_speech( | |
(speech * 32767).astype(np.int16).tobytes(), | |
sample_rate=sr | |
) | |
# Transcription | |
text = "" | |
lang = "en" | |
if is_speech: | |
with sr.AudioFile(wav_path) as source: | |
audio = self._sr_recognizer.record(source) | |
try: | |
text = self._sr_recognizer.recognize_google(audio, language="en-US") | |
except sr.UnknownValueError: | |
pass | |
except Exception as e: | |
logger.warning(f"Speech recognition failed: {e}") | |
# Emotion analysis | |
emotion = await self.analyze_voice_emotion(wav_path) if is_speech else "no_speech" | |
# Update history | |
result = { | |
"text": text, | |
"language": lang, | |
"emotion": emotion, | |
"is_speech": is_speech | |
} | |
self._update_history(user_id, "user", result, lang) | |
return result | |
except Exception as e: | |
logger.error(f"Voice message processing failed: {e}") | |
return {"error": str(e)} | |
finally: | |
self._cleanup(ogg_path, wav_path) | |
async def generate_voice_reply(self, text: str, user_id: int, fmt: str = "ogg") -> str: | |
"""Generate audio from text (TTS)""" | |
mp3_path = self._tmp_path(".mp3") | |
out_path = self._tmp_path(f".{fmt}") | |
try: | |
# Generate TTS | |
tts = gTTS(text=text, lang='en') | |
tts.save(mp3_path) | |
# Convert format | |
ffmpeg_path = imageio_ffmpeg.get_ffmpeg_exe() | |
if fmt == "ogg": | |
subprocess.run([ | |
ffmpeg_path, "-y", "-i", mp3_path, | |
"-c:a", "libopus", out_path | |
], check=True) | |
elif fmt == "wav": | |
subprocess.run([ | |
ffmpeg_path, "-y", "-i", mp3_path, out_path | |
], check=True) | |
else: | |
shutil.move(mp3_path, out_path) | |
# Update history | |
self._update_history(user_id, "assistant", f"[Voice reply: {fmt}]") | |
return out_path | |
except Exception as e: | |
logger.error(f"Voice reply generation failed: {e}") | |
raise RuntimeError(f"TTS failed: {e}") | |
finally: | |
if fmt != "mp3" and os.path.exists(mp3_path): | |
self._cleanup(mp3_path) | |
# ---------------------- | |
# Text Processing | |
# ---------------------- | |
async def generate_response(self, text: str, user_id: int, lang: str = "en") -> str: | |
"""Generate conversational response with context""" | |
self._load_chatbot() | |
self._load_translation() | |
# Update history | |
self._update_history(user_id, "user", text, lang) | |
# Prepare context | |
context = [] | |
for msg in self.user_chat_histories[user_id][-5:]: | |
if msg["language"] != "en": | |
try: | |
translated = self._translator(msg["content"])[0]["translation_text"] | |
context.append(f"{msg['role']}: {translated}") | |
except Exception: | |
context.append(f"{msg['role']}: {msg['content']}") | |
else: | |
context.append(f"{msg['role']}: {msg['content']}") | |
# Generate response | |
input_text = f"Context:\n{' '.join(context)}\nUser: {text}" | |
inputs = self._chat_tokenizer(input_text, return_tensors="pt").to(self.device) | |
try: | |
outputs = self._chat_model.generate( | |
**inputs, | |
max_new_tokens=200, | |
do_sample=True, | |
temperature=0.7 | |
) | |
response = self._chat_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
except Exception as e: | |
logger.error(f"Response generation failed: {e}") | |
response = "I couldn't generate a response. Please try again." | |
# Translate if needed | |
if lang != "en": | |
try: | |
response = self._translator(response)[0]["translation_text"] | |
except Exception: | |
pass | |
# Update history | |
self._update_history(user_id, "assistant", response, lang) | |
return response | |
# ---------------------- | |
# Image Processing | |
# ---------------------- | |
async def process_image_message(self, image_file, user_id: int) -> str: | |
"""Generate caption for an image""" | |
img_path = self._tmp_path(".jpg") | |
try: | |
# Save and load image | |
await image_file.download_to_drive(img_path) | |
image = Image.open(img_path).convert("RGB") | |
# Generate caption | |
self._load_image_captioner() | |
caption = self._image_captioner(image)[0]["generated_text"] | |
# Update history | |
self._update_history(user_id, "user", "[Image]", "en") | |
self._update_history(user_id, "assistant", f"Image description: {caption}", "en") | |
return caption | |
except Exception as e: | |
logger.error(f"Image processing failed: {e}") | |
return f"Error processing image: {str(e)}" | |
finally: | |
self._cleanup(img_path) | |
async def generate_image_from_text(self, prompt: str, user_id: int, | |
width: int = 512, height: int = 512, | |
steps: int = 30) -> str: | |
"""Generate image from text prompt""" | |
self._load_sd() | |
if not self._sd_pipe: | |
raise RuntimeError("Image generation unavailable") | |
out_path = self._tmp_path(".png") | |
try: | |
# Generate image | |
result = self._sd_pipe( | |
prompt, | |
num_inference_steps=steps, | |
height=height, | |
width=width | |
) | |
result.images[0].save(out_path) | |
# Update history | |
self._update_history(user_id, "user", f"[Image request: {prompt}]", "en") | |
self._update_history(user_id, "assistant", f"[Generated image]", "en") | |
return out_path | |
except Exception as e: | |
logger.error(f"Image generation failed: {e}") | |
raise RuntimeError(f"Image generation failed: {e}") | |
async def edit_image_inpaint(self, image_file, mask_file=None, | |
prompt: str = "", user_id: int = 0) -> str: | |
"""Edit image using inpainting""" | |
self._load_sd() | |
if not self._sd_inpaint: | |
raise RuntimeError("Image editing unavailable") | |
img_path = self._tmp_path(".png") | |
mask_path = self._tmp_path("_mask.png") if mask_file else None | |
out_path = self._tmp_path("_edited.png") | |
try: | |
# Save inputs | |
await image_file.download_to_drive(img_path) | |
if mask_file: | |
await mask_file.download_to_drive(mask_path) | |
# Prepare images | |
init_image = Image.open(img_path).convert("RGB") | |
mask_image = Image.open(mask_path).convert("L") if mask_path else Image.new("L", init_image.size, 255) | |
# Inpaint | |
result = self._sd_inpaint( | |
prompt=prompt if prompt else " ", | |
image=init_image, | |
mask_image=mask_image, | |
guidance_scale=7.5, | |
num_inference_steps=30 | |
) | |
result.images[0].save(out_path) | |
# Update history | |
self._update_history(user_id, "user", "[Image edit request]", "en") | |
self._update_history(user_id, "assistant", "[Edited image]", "en") | |
return out_path | |
except Exception as e: | |
logger.error(f"Image editing failed: {e}") | |
raise RuntimeError(f"Inpainting failed: {e}") | |
finally: | |
self._cleanup(img_path, mask_path) | |
# ---------------------- | |
# Video Processing | |
# ---------------------- | |
async def process_video(self, video_file, user_id: int, max_frames: int = 4) -> Dict[str, Any]: | |
"""Process video file to extract audio and keyframes""" | |
vid_path = self._tmp_path(".mp4") | |
audio_path = self._tmp_path(".wav") | |
try: | |
# Save video | |
await video_file.download_to_drive(vid_path) | |
# Extract audio | |
clip = mp.VideoFileClip(vid_path) | |
clip.audio.write_audiofile(audio_path, logger=None) | |
duration = clip.duration | |
fps = clip.fps | |
# Transcribe audio | |
transcribed = "" | |
try: | |
with sr.AudioFile(audio_path) as source: | |
audio = self._sr_recognizer.record(source) | |
transcribed = self._sr_recognizer.recognize_google(audio) | |
except Exception as e: | |
logger.warning(f"Audio transcription failed: {e}") | |
# Extract frames | |
frames = [] | |
captions = [] | |
try: | |
reader = imageio.get_reader(vid_path) | |
total_frames = reader.count_frames() | |
step = max(1, total_frames // max_frames) | |
for i in range(0, total_frames, step): | |
try: | |
frame = reader.get_data(i) | |
frame_path = self._tmp_path(f"_frame{i}.jpg") | |
Image.fromarray(frame).save(frame_path) | |
frames.append(frame_path) | |
if len(frames) >= max_frames: | |
break | |
except Exception: | |
continue | |
# Generate captions | |
if frames and self._load_image_captioner(): | |
for frame_path in frames: | |
try: | |
caption = self._image_captioner(Image.open(frame_path))[0]["generated_text"] | |
captions.append(caption) | |
except Exception: | |
captions.append("") | |
finally: | |
self._cleanup(frame_path) | |
except Exception as e: | |
logger.warning(f"Frame extraction failed: {e}") | |
# Update history | |
result = { | |
"duration": duration, | |
"fps": fps, | |
"transcription": transcribed, | |
"captions": captions | |
} | |
self._update_history(user_id, "user", "[Video upload]", "en") | |
self._update_history(user_id, "assistant", result, "en") | |
return result | |
except Exception as e: | |
logger.error(f"Video processing failed: {e}") | |
return {"error": str(e)} | |
finally: | |
self._cleanup(vid_path, audio_path) | |
# ---------------------- | |
# File Processing | |
# ---------------------- | |
async def process_file(self, file_obj, user_id: int) -> Dict[str, Any]: | |
"""Process document files (PDF, DOCX, TXT)""" | |
fpath = self._tmp_path() | |
try: | |
# Save file | |
await file_obj.download_to_drive(fpath) | |
# Read based on type | |
text = "" | |
if fpath.lower().endswith(".pdf"): | |
try: | |
with fitz.open(fpath) as doc: | |
text = "\n".join([page.get_text() for page in doc]) | |
except Exception as e: | |
text = f"[PDF error: {e}]" | |
elif fpath.lower().endswith((".txt", ".csv")): | |
try: | |
with open(fpath, "r", encoding="utf-8", errors="ignore") as f: | |
text = f.read() | |
except Exception as e: | |
text = f"[Text error: {e}]" | |
elif fpath.lower().endswith(".docx"): | |
try: | |
import docx | |
doc = docx.Document(fpath) | |
text = "\n".join([p.text for p in doc.paragraphs]) | |
except Exception as e: | |
text = f"[DOCX error: {e}]" | |
else: | |
text = "[Unsupported file type]" | |
# Summarize | |
summary = text[:500] + ("..." if len(text) > 500 else "") | |
# Update history | |
result = { | |
"summary": summary, | |
"length": len(text), | |
"type": os.path.splitext(fpath)[1] | |
} | |
self._update_history(user_id, "user", f"[File upload: {result['type']}]", "en") | |
self._update_history(user_id, "assistant", result, "en") | |
return result | |
except Exception as e: | |
logger.error(f"File processing failed: {e}") | |
return {"error": str(e)} | |
finally: | |
self._cleanup(fpath) | |
# ---------------------- | |
# Code Processing | |
# ---------------------- | |
async def code_complete(self, prompt: str, max_tokens: int = 512, | |
temperature: float = 0.2) -> str: | |
"""Generate code completions""" | |
self._load_code_model() | |
if not self._code_model or not self._code_tokenizer: | |
raise RuntimeError("Code model not available") | |
try: | |
inputs = self._code_tokenizer(prompt, return_tensors="pt").to(self.device) | |
outputs = self._code_model.generate( | |
**inputs, | |
max_new_tokens=max_tokens, | |
temperature=temperature, | |
do_sample=True | |
) | |
return self._code_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
except Exception as e: | |
logger.error(f"Code completion failed: {e}") | |
raise RuntimeError(f"Code generation error: {e}") | |
async def execute_python_code(self, code: str, timeout: int = 5) -> Dict[str, str]: | |
"""Execute Python code in sandbox (DANGER: Unsecure)""" | |
temp_dir = self._tmp_path() | |
script_path = os.path.join(temp_dir, "script.py") | |
try: | |
# Create temp dir | |
os.makedirs(temp_dir, exist_ok=True) | |
# Write script | |
with open(script_path, "w") as f: | |
f.write(code) | |
# Execute | |
proc = await asyncio.create_subprocess_exec( | |
"python3", script_path, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE | |
) | |
try: | |
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) | |
return { | |
"stdout": stdout.decode("utf-8", errors="ignore"), | |
"stderr": stderr.decode("utf-8", errors="ignore") | |
} | |
except asyncio.TimeoutError: | |
proc.kill() | |
return {"error": "Execution timed out"} | |
except Exception as e: | |
logger.error(f"Code execution failed: {e}") | |
return {"error": str(e)} | |
finally: | |
self._cleanup(temp_dir) |