TTS_API_Image / app.py
khurrameycon's picture
updated with nebius and mistral model
3f982d9 verified
# from fastapi import FastAPI, Response
# from fastapi.responses import FileResponse
# from kokoro import KPipeline
# import soundfile as sf
# import os
# import numpy as np
# import torch
# from huggingface_hub import InferenceClient
# def llm_chat_response(text):
# HF_TOKEN = os.getenv("HF_TOKEN")
# client = InferenceClient(api_key=HF_TOKEN)
# messages = [
# {
# "role": "user",
# "content": [
# {
# "type": "text",
# "text": text + str('describe in one line only')
# } #,
# # {
# # "type": "image_url",
# # "image_url": {
# # "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg"
# # }
# # }
# ]
# }
# ]
# response_from_llama = client.chat.completions.create(
# model="meta-llama/Llama-3.2-11B-Vision-Instruct",
# messages=messages,
# max_tokens=500)
# return response_from_llama.choices[0].message['content']
# app = FastAPI()
# # Initialize pipeline once at startup
# pipeline = KPipeline(lang_code='a')
# @app.post("/generate")
# async def generate_audio(text: str, voice: str = "af_heart", speed: float = 1.0):
# text_reply = llm_chat_response(text)
# # Generate audio
# generator = pipeline(
# text_reply,
# voice=voice,
# speed=speed,
# split_pattern=r'\n+'
# )
# # # Save first segment only for demo
# # for i, (gs, ps, audio) in enumerate(generator):
# # sf.write(f"output_{i}.wav", audio, 24000)
# # return FileResponse(
# # f"output_{i}.wav",
# # media_type="audio/wav",
# # filename="output.wav"
# # )
# # return Response("No audio generated", status_code=400)
# # Process only the first segment for demo
# for i, (gs, ps, audio) in enumerate(generator):
# # Convert PyTorch tensor to NumPy array
# audio_numpy = audio.cpu().numpy()
# # Convert to 16-bit PCM
# # Ensure the audio is in the range [-1, 1]
# audio_numpy = np.clip(audio_numpy, -1, 1)
# # Convert to 16-bit signed integers
# pcm_data = (audio_numpy * 32767).astype(np.int16)
# # Convert to bytes (automatically uses row-major order)
# raw_audio = pcm_data.tobytes()
# # Return PCM data with minimal necessary headers
# return Response(
# content=raw_audio,
# media_type="application/octet-stream",
# headers={
# "Content-Disposition": f'attachment; filename="output.pcm"',
# "X-Sample-Rate": "24000",
# "X-Bits-Per-Sample": "16",
# "X-Endianness": "little"
# }
# )
# return Response("No audio generated", status_code=400)
from fastapi import FastAPI, Response, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from kokoro import KPipeline
import soundfile as sf
import os
import numpy as np
import torch
from huggingface_hub import InferenceClient
from pydantic import BaseModel
import base64
from io import BytesIO
from PIL import Image
import logging
from typing import Optional
import uuid
import pathlib
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create a directory for temporary image storage
TEMP_DIR = pathlib.Path("./temp_images")
TEMP_DIR.mkdir(exist_ok=True)
class TextImageRequest(BaseModel):
text: Optional[str] = None
image_base64: Optional[str] = None
voice: str = "af_heart"
speed: float = 1.0
class AudioResponse(BaseModel):
status: str
message: str
# Initialize FastAPI app
app = FastAPI(
title="Text-to-Speech API with Vision Support",
description="API for generating speech from text with optional image analysis",
version="1.0.0"
)
def save_base64_image(image_base64):
"""Save base64 image to a temporary file and return the file path"""
try:
# Generate a unique filename
filename = f"{uuid.uuid4()}.jpg"
filepath = TEMP_DIR / filename
# Decode and save the image
image_data = base64.b64decode(image_base64)
with open(filepath, "wb") as f:
f.write(image_data)
# Return the file URL (using file:// protocol)
return f"file://{filepath.absolute()}"
except Exception as e:
logger.error(f"Error saving base64 image: {str(e)}")
raise HTTPException(status_code=400, detail=f"Invalid base64 image data: {str(e)}")
def llm_chat_response(text, image_base64=None):
"""Function to get responses from LLM with text and optionally image input."""
try:
HF_TOKEN = os.getenv("HF_TOKEN")
logger.info("Checking HF_TOKEN...")
if not HF_TOKEN:
logger.error("HF_TOKEN not found in environment variables")
raise HTTPException(status_code=500, detail="HF_TOKEN not configured")
logger.info("Initializing InferenceClient...")
client = InferenceClient(
provider="nebius", # Using novita as in your working example
api_key=HF_TOKEN
)
# Build the messages payload using the format from your working example
message_content = [{
"type": "text",
"text": text + ("" if image_base64 else " describe in one line only")
}]
if image_base64:
logger.info("Processing base64 image...")
# Save the base64 image to a file and get the file URL
image_url = save_base64_image(image_base64)
logger.info(f"Image saved at: {image_url}")
# Create data URI
data_uri = f"data:image/jpeg;base64,{image_base64}"
# Add image to message content
message_content.append({
"type": "image_url",
"image_url": {"url": data_uri}
})
# Construct the messages array exactly as in your working example
messages = [{
"role": "user",
"content": message_content
}]
logger.info("Sending request to model...")
try:
completion = client.chat.completions.create(
model="mistralai/Mistral-Small-3.1-24B-Instruct-2503",
messages=messages,
max_tokens=500
)
except Exception as http_err:
# Log HTTP errors from the request
logger.error(f"HTTP error occurred: {str(http_err)}")
raise HTTPException(status_code=500, detail=str(http_err))
logger.info(f"Raw model response received")
# Extract the response using the same method as your working code
if not completion.choices or len(completion.choices) == 0:
logger.error("No choices returned from model.")
raise HTTPException(status_code=500, detail="Model returned no choices.")
# Extract the response message from the first choice
choice = completion.choices[0]
response_message = None
if hasattr(choice, "message"):
response_message = choice.message
elif isinstance(choice, dict):
response_message = choice.get("message")
if not response_message:
logger.error(f"Response message is empty: {choice}")
raise HTTPException(status_code=500, detail="Model response did not include a message.")
content = None
if isinstance(response_message, dict):
content = response_message.get("content")
if content is None and hasattr(response_message, "content"):
content = response_message.content
if not content:
logger.error(f"Message content is missing: {response_message}")
raise HTTPException(status_code=500, detail="Model message did not include content.")
return content
except Exception as e:
logger.error(f"Error in llm_chat_response: {str(e)}")
# Fallback response in case of error
return "I couldn't process that input. Please try again with a different image or text query."
# Initialize pipeline once at startup
try:
logger.info("Initializing KPipeline...")
pipeline = KPipeline(lang_code='a')
logger.info("KPipeline initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize KPipeline: {str(e)}")
# We'll let the app start anyway, but log the error
@app.post("/generate")
async def generate_audio(request: TextImageRequest):
"""
Generate audio from text and optionally analyze an image.
- If text is provided, uses that as input
- If image is provided, analyzes the image
- Converts the LLM response to speech using the specified voice and speed
"""
try:
logger.info(f"Received audio generation request")
# If no text is provided but image is provided, use default prompt
user_text = request.text if request.text is not None else ""
if not user_text and request.image_base64:
user_text = "Describe what you see in the image"
elif not user_text and not request.image_base64:
logger.error("Neither text nor image provided in request")
return JSONResponse(
status_code=400,
content={"error": "Request must include either text or image_base64"}
)
# Generate response using text and image if provided
logger.info("Getting LLM response...")
text_reply = llm_chat_response(user_text, request.image_base64)
logger.info(f"LLM response: {text_reply}")
# Generate audio
logger.info(f"Generating audio using voice={request.voice}, speed={request.speed}")
try:
generator = pipeline(
text_reply,
voice=request.voice,
speed=request.speed,
split_pattern=r'\n+'
)
# Process only the first segment for demo
for i, (gs, ps, audio) in enumerate(generator):
logger.info(f"Audio generated successfully: segment {i}")
# Convert PyTorch tensor to NumPy array
audio_numpy = audio.cpu().numpy()
# Convert to 16-bit PCM
# Ensure the audio is in the range [-1, 1]
audio_numpy = np.clip(audio_numpy, -1, 1)
# Convert to 16-bit signed integers
pcm_data = (audio_numpy * 32767).astype(np.int16)
# Convert to bytes (automatically uses row-major order)
raw_audio = pcm_data.tobytes()
# Return PCM data with minimal necessary headers
return Response(
content=raw_audio,
media_type="application/octet-stream",
headers={
"Content-Disposition": f'attachment; filename="output.pcm"',
"X-Sample-Rate": "24000",
"X-Bits-Per-Sample": "16",
"X-Endianness": "little"
}
)
logger.error("No audio segments generated")
return JSONResponse(
status_code=400,
content={"error": "No audio generated", "detail": "The pipeline did not produce any audio"}
)
except Exception as e:
logger.error(f"Error generating audio: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": "Audio generation failed", "detail": str(e)}
)
except Exception as e:
logger.error(f"Unexpected error in generate_audio endpoint: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": "Internal server error", "detail": str(e)}
)
@app.get("/")
async def root():
return {"message": "Welcome to the Text-to-Speech API with Vision Support. Use POST /generate endpoint with 'text' and optionally 'image_base64' for queries."}
# Cleanup function to periodically remove old temporary images
@app.on_event("startup")
async def startup_event():
# You could add scheduled tasks here to clean up old images
pass
@app.exception_handler(404)
async def not_found_handler(request, exc):
return JSONResponse(
status_code=404,
content={"error": "Endpoint not found. Please use POST /generate for queries."}
)
@app.exception_handler(405)
async def method_not_allowed_handler(request, exc):
return JSONResponse(
status_code=405,
content={"error": "Method not allowed. Please check the API documentation."}
)