# app.py import time import whisper from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import FileResponse from typing import Optional import os import psutil app = FastAPI() start_time = time.time() # Load model during startup @app.on_event("startup") def load_model(): try: app.state.model = whisper.load_model("large") print("Model loaded successfully") except Exception as e: print(f"Error loading model: {e}") raise def format_time(seconds: float) -> str: """Convert seconds to SRT time format""" milliseconds = int((seconds - int(seconds)) * 1000) hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" def generate_srt(transcript: dict) -> str: """Generate SRT content from Whisper transcript""" srt_content = [] index = 1 for segment in transcript['segments']: for word in segment.get('words', []): start = word['start'] end = word['end'] start_time = format_time(start) end_time = format_time(end) srt_content.append( f"{index}\n" f"{start_time} --> {end_time}\n" f"{word['word'].strip()}\n\n" ) index += 1 return "".join(srt_content) @app.post("/transcribe") async def transcribe_audio( file: UploadFile = File(..., description="Audio/video file to transcribe"), task_token: Optional[str] = None ): """Endpoint for submitting transcription tasks""" try: # Save uploaded file temporarily temp_file = f"temp_{file.filename}" with open(temp_file, "wb") as buffer: content = await file.read() buffer.write(content) # Transcribe audio result = app.state.model.transcribe( temp_file, word_timestamps=True ) # Generate SRT file srt_content = generate_srt(result) srt_file = f"{temp_file}.srt" with open(srt_file, "w") as f: f.write(srt_content) # Clean up temporary files os.remove(temp_file) return FileResponse( srt_file, media_type='application/x-subrip', filename=f"{file.filename}.srt" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: if os.path.exists(temp_file): os.remove(temp_file) if os.path.exists(srt_file): os.remove(srt_file) @app.get("/status") async def get_status(): """Get server health status""" process = psutil.Process(os.getpid()) return { "status": "OK", "uptime": round(time.time() - start_time, 2), "memory_usage": f"{process.memory_info().rss / 1024 / 1024:.2f} MB", "model_loaded": hasattr(app.state, "model"), "active_requests": len(process.connections()) } @app.get("/model_status") async def get_model_status(): """Get model information""" if not hasattr(app.state, "model"): return {"model_status": "Not loaded"} return { "model_name": "Whisper large", "device": app.state.model.device, "parameters": f"{sum(p.numel() for p in app.state.model.parameters()):,}" } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)