Spaces:
Runtime error
Runtime error
File size: 3,531 Bytes
2077444 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# app.py
import time
import whisper
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import FileResponse
from typing import Optional
import os
import psutil
app = FastAPI()
start_time = time.time()
# Load model during startup
@app.on_event("startup")
def load_model():
try:
app.state.model = whisper.load_model("large")
print("Model loaded successfully")
except Exception as e:
print(f"Error loading model: {e}")
raise
def format_time(seconds: float) -> str:
"""Convert seconds to SRT time format"""
milliseconds = int((seconds - int(seconds)) * 1000)
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def generate_srt(transcript: dict) -> str:
"""Generate SRT content from Whisper transcript"""
srt_content = []
index = 1
for segment in transcript['segments']:
for word in segment.get('words', []):
start = word['start']
end = word['end']
start_time = format_time(start)
end_time = format_time(end)
srt_content.append(
f"{index}\n"
f"{start_time} --> {end_time}\n"
f"{word['word'].strip()}\n\n"
)
index += 1
return "".join(srt_content)
@app.post("/transcribe")
async def transcribe_audio(
file: UploadFile = File(..., description="Audio/video file to transcribe"),
task_token: Optional[str] = None
):
"""Endpoint for submitting transcription tasks"""
try:
# Save uploaded file temporarily
temp_file = f"temp_{file.filename}"
with open(temp_file, "wb") as buffer:
content = await file.read()
buffer.write(content)
# Transcribe audio
result = app.state.model.transcribe(
temp_file,
word_timestamps=True
)
# Generate SRT file
srt_content = generate_srt(result)
srt_file = f"{temp_file}.srt"
with open(srt_file, "w") as f:
f.write(srt_content)
# Clean up temporary files
os.remove(temp_file)
return FileResponse(
srt_file,
media_type='application/x-subrip',
filename=f"{file.filename}.srt"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
if os.path.exists(temp_file):
os.remove(temp_file)
if os.path.exists(srt_file):
os.remove(srt_file)
@app.get("/status")
async def get_status():
"""Get server health status"""
process = psutil.Process(os.getpid())
return {
"status": "OK",
"uptime": round(time.time() - start_time, 2),
"memory_usage": f"{process.memory_info().rss / 1024 / 1024:.2f} MB",
"model_loaded": hasattr(app.state, "model"),
"active_requests": len(process.connections())
}
@app.get("/model_status")
async def get_model_status():
"""Get model information"""
if not hasattr(app.state, "model"):
return {"model_status": "Not loaded"}
return {
"model_name": "Whisper large",
"device": app.state.model.device,
"parameters": f"{sum(p.numel() for p in app.state.model.parameters()):,}"
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |