Spaces:
Sleeping
Sleeping
# main.py | |
from fastapi import FastAPI,status,Response,Request,Depends,HTTPException,UploadFile, File | |
from fastapi.responses import StreamingResponse,FileResponse | |
from models import load_text_model,generate_text,load_audio_model,generate_audio,load_image_model, generate_image | |
from schemas import VoicePresets | |
from utils import audio_array_to_buffer,img_to_bytes | |
from contextlib import asynccontextmanager | |
from typing import AsyncIterator,Callable,Awaitable,Annotated | |
from uuid import uuid4 | |
import time | |
from datetime import datetime, timezone | |
import csv | |
from dependencies import get_urls_content | |
from schemas import TextModelResponse,TextModelRequest | |
import shutil, uuid | |
from upload import save_file | |
models = {} | |
async def lifespan(_: FastAPI) -> AsyncIterator[None]: | |
# models["text2image"] = load_image_model() | |
# models["text"]=load_text_model() | |
yield | |
models.clear() | |
app = FastAPI(lifespan=lifespan) | |
csv_header = [ | |
"Request ID", "Datetime", "Endpoint Triggered", "Client IP Address", | |
"Response Time", "Status Code", "Successful" | |
] | |
async def monitor_service( | |
req: Request, call_next: Callable[[Request], Awaitable[Response]] | |
) -> Response: | |
request_id = uuid4().hex | |
request_datetime = datetime.now(timezone.utc).isoformat() | |
start_time = time.perf_counter() | |
response: Response = await call_next(req) | |
response_time = round(time.perf_counter() - start_time, 4) | |
response.headers["X-Response-Time"] = str(response_time) | |
response.headers["X-API-Request-ID"] = request_id | |
with open("usage.csv", "a", newline="") as file: | |
writer = csv.writer(file) | |
if file.tell() == 0: | |
writer.writerow(csv_header) | |
writer.writerow( | |
[ | |
request_id, | |
request_datetime, | |
req.url, | |
req.client.host, | |
response_time, | |
response.status_code, | |
response.status_code < 400, | |
] | |
) | |
return response | |
# app = FastAPI() | |
def root_controller(): | |
return {"status": "healthy"} | |
async def serve_language_model_controller(request: Request, | |
body: TextModelRequest , | |
urls_content: str = Depends(get_urls_content)) -> TextModelResponse: | |
prompt = body.prompt + " " + urls_content | |
output = generate_text(models["text"], prompt, body.temperature) | |
return TextModelResponse(content=output, ip=request.client.host) | |
def get_logs(): | |
# return FileResponse("usage.csv", media_type='text/csv', filename="usage.csv") | |
temp_file = f"temp_{uuid.uuid4().hex}.csv" | |
shutil.copyfile("usage.csv", temp_file) | |
# Return file and ensure FastAPI deletes it after sending | |
return FileResponse( | |
temp_file, | |
media_type="text/csv", | |
filename="logs.csv", | |
headers={"Content-Disposition": "attachment; filename=logs.csv"} | |
) | |
def serve_text_to_audio_model_controller( | |
prompt: str, | |
preset: VoicePresets = "v2/en_speaker_1", | |
): | |
processor, model = load_audio_model() | |
output, sample_rate = generate_audio(processor, model, prompt, preset) | |
return StreamingResponse( | |
audio_array_to_buffer(output, sample_rate), media_type="audio/wav" | |
) | |
def serve_text_to_image_model_controller(prompt: str): | |
# pipe = load_image_model() | |
# output = generate_image(pipe, prompt) | |
output = generate_image(models["text2image"], prompt) | |
return Response(content=img_to_bytes(output), media_type="image/png") | |
async def file_upload_controller( | |
file: Annotated[UploadFile, File(description="Uploaded PDF documents")] | |
): | |
if file.content_type != "application/pdf": | |
raise HTTPException( | |
detail=f"Only uploading PDF documents are supported", | |
status_code=status.HTTP_400_BAD_REQUEST, | |
) | |
try: | |
await save_file(file) | |
except Exception as e: | |
raise HTTPException( | |
detail=f"An error occurred while saving file - Error: {e}", | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
) | |
return {"filename": file.filename, "message": "File uploaded successfully"} | |