Spaces:
Sleeping
Sleeping
# from fastapi import FastAPI, Response | |
# from fastapi.responses import FileResponse | |
# from kokoro import KPipeline | |
# import soundfile as sf | |
# import os | |
# import numpy as np | |
# import torch | |
# from huggingface_hub import InferenceClient | |
# def llm_chat_response(text): | |
# HF_TOKEN = os.getenv("HF_TOKEN") | |
# client = InferenceClient(api_key=HF_TOKEN) | |
# messages = [ | |
# { | |
# "role": "user", | |
# "content": [ | |
# { | |
# "type": "text", | |
# "text": text + str('describe in one line only') | |
# } #, | |
# # { | |
# # "type": "image_url", | |
# # "image_url": { | |
# # "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" | |
# # } | |
# # } | |
# ] | |
# } | |
# ] | |
# response_from_llama = client.chat.completions.create( | |
# model="meta-llama/Llama-3.2-11B-Vision-Instruct", | |
# messages=messages, | |
# max_tokens=500) | |
# return response_from_llama.choices[0].message['content'] | |
# app = FastAPI() | |
# # Initialize pipeline once at startup | |
# pipeline = KPipeline(lang_code='a') | |
# @app.post("/generate") | |
# async def generate_audio(text: str, voice: str = "af_heart", speed: float = 1.0): | |
# text_reply = llm_chat_response(text) | |
# # Generate audio | |
# generator = pipeline( | |
# text_reply, | |
# voice=voice, | |
# speed=speed, | |
# split_pattern=r'\n+' | |
# ) | |
# # # Save first segment only for demo | |
# # for i, (gs, ps, audio) in enumerate(generator): | |
# # sf.write(f"output_{i}.wav", audio, 24000) | |
# # return FileResponse( | |
# # f"output_{i}.wav", | |
# # media_type="audio/wav", | |
# # filename="output.wav" | |
# # ) | |
# # return Response("No audio generated", status_code=400) | |
# # Process only the first segment for demo | |
# for i, (gs, ps, audio) in enumerate(generator): | |
# # Convert PyTorch tensor to NumPy array | |
# audio_numpy = audio.cpu().numpy() | |
# # Convert to 16-bit PCM | |
# # Ensure the audio is in the range [-1, 1] | |
# audio_numpy = np.clip(audio_numpy, -1, 1) | |
# # Convert to 16-bit signed integers | |
# pcm_data = (audio_numpy * 32767).astype(np.int16) | |
# # Convert to bytes (automatically uses row-major order) | |
# raw_audio = pcm_data.tobytes() | |
# # Return PCM data with minimal necessary headers | |
# return Response( | |
# content=raw_audio, | |
# media_type="application/octet-stream", | |
# headers={ | |
# "Content-Disposition": f'attachment; filename="output.pcm"', | |
# "X-Sample-Rate": "24000", | |
# "X-Bits-Per-Sample": "16", | |
# "X-Endianness": "little" | |
# } | |
# ) | |
# return Response("No audio generated", status_code=400) | |
from fastapi import FastAPI, Response, HTTPException | |
from fastapi.responses import FileResponse, JSONResponse | |
from kokoro import KPipeline | |
import soundfile as sf | |
import os | |
import numpy as np | |
import torch | |
from huggingface_hub import InferenceClient | |
from pydantic import BaseModel | |
import base64 | |
from io import BytesIO | |
from PIL import Image | |
import logging | |
from typing import Optional | |
import uuid | |
import pathlib | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Create a directory for temporary image storage | |
TEMP_DIR = pathlib.Path("./temp_images") | |
TEMP_DIR.mkdir(exist_ok=True) | |
class TextImageRequest(BaseModel): | |
text: Optional[str] = None | |
image_base64: Optional[str] = None | |
voice: str = "af_heart" | |
speed: float = 1.0 | |
class AudioResponse(BaseModel): | |
status: str | |
message: str | |
# Initialize FastAPI app | |
app = FastAPI( | |
title="Text-to-Speech API with Vision Support", | |
description="API for generating speech from text with optional image analysis", | |
version="1.0.0" | |
) | |
def save_base64_image(image_base64): | |
"""Save base64 image to a temporary file and return the file path""" | |
try: | |
# Generate a unique filename | |
filename = f"{uuid.uuid4()}.jpg" | |
filepath = TEMP_DIR / filename | |
# Decode and save the image | |
image_data = base64.b64decode(image_base64) | |
with open(filepath, "wb") as f: | |
f.write(image_data) | |
# Return the file URL (using file:// protocol) | |
return f"file://{filepath.absolute()}" | |
except Exception as e: | |
logger.error(f"Error saving base64 image: {str(e)}") | |
raise HTTPException(status_code=400, detail=f"Invalid base64 image data: {str(e)}") | |
def llm_chat_response(text, image_base64=None): | |
"""Function to get responses from LLM with text and optionally image input.""" | |
try: | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
logger.info("Checking HF_TOKEN...") | |
if not HF_TOKEN: | |
logger.error("HF_TOKEN not found in environment variables") | |
raise HTTPException(status_code=500, detail="HF_TOKEN not configured") | |
logger.info("Initializing InferenceClient...") | |
client = InferenceClient( | |
provider="nebius", # Using novita as in your working example | |
api_key=HF_TOKEN | |
) | |
# Build the messages payload using the format from your working example | |
message_content = [{ | |
"type": "text", | |
"text": text + ("" if image_base64 else " describe in one line only") | |
}] | |
if image_base64: | |
logger.info("Processing base64 image...") | |
# Save the base64 image to a file and get the file URL | |
image_url = save_base64_image(image_base64) | |
logger.info(f"Image saved at: {image_url}") | |
# Create data URI | |
data_uri = f"data:image/jpeg;base64,{image_base64}" | |
# Add image to message content | |
message_content.append({ | |
"type": "image_url", | |
"image_url": {"url": data_uri} | |
}) | |
# Construct the messages array exactly as in your working example | |
messages = [{ | |
"role": "user", | |
"content": message_content | |
}] | |
logger.info("Sending request to model...") | |
try: | |
completion = client.chat.completions.create( | |
model="mistralai/Mistral-Small-3.1-24B-Instruct-2503", | |
messages=messages, | |
max_tokens=500 | |
) | |
except Exception as http_err: | |
# Log HTTP errors from the request | |
logger.error(f"HTTP error occurred: {str(http_err)}") | |
raise HTTPException(status_code=500, detail=str(http_err)) | |
logger.info(f"Raw model response received") | |
# Extract the response using the same method as your working code | |
if not completion.choices or len(completion.choices) == 0: | |
logger.error("No choices returned from model.") | |
raise HTTPException(status_code=500, detail="Model returned no choices.") | |
# Extract the response message from the first choice | |
choice = completion.choices[0] | |
response_message = None | |
if hasattr(choice, "message"): | |
response_message = choice.message | |
elif isinstance(choice, dict): | |
response_message = choice.get("message") | |
if not response_message: | |
logger.error(f"Response message is empty: {choice}") | |
raise HTTPException(status_code=500, detail="Model response did not include a message.") | |
content = None | |
if isinstance(response_message, dict): | |
content = response_message.get("content") | |
if content is None and hasattr(response_message, "content"): | |
content = response_message.content | |
if not content: | |
logger.error(f"Message content is missing: {response_message}") | |
raise HTTPException(status_code=500, detail="Model message did not include content.") | |
return content | |
except Exception as e: | |
logger.error(f"Error in llm_chat_response: {str(e)}") | |
# Fallback response in case of error | |
return "I couldn't process that input. Please try again with a different image or text query." | |
# Initialize pipeline once at startup | |
try: | |
logger.info("Initializing KPipeline...") | |
pipeline = KPipeline(lang_code='a') | |
logger.info("KPipeline initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize KPipeline: {str(e)}") | |
# We'll let the app start anyway, but log the error | |
async def generate_audio(request: TextImageRequest): | |
""" | |
Generate audio from text and optionally analyze an image. | |
- If text is provided, uses that as input | |
- If image is provided, analyzes the image | |
- Converts the LLM response to speech using the specified voice and speed | |
""" | |
try: | |
logger.info(f"Received audio generation request") | |
# If no text is provided but image is provided, use default prompt | |
user_text = request.text if request.text is not None else "" | |
if not user_text and request.image_base64: | |
user_text = "Describe what you see in the image" | |
elif not user_text and not request.image_base64: | |
logger.error("Neither text nor image provided in request") | |
return JSONResponse( | |
status_code=400, | |
content={"error": "Request must include either text or image_base64"} | |
) | |
# Generate response using text and image if provided | |
logger.info("Getting LLM response...") | |
text_reply = llm_chat_response(user_text, request.image_base64) | |
logger.info(f"LLM response: {text_reply}") | |
# Generate audio | |
logger.info(f"Generating audio using voice={request.voice}, speed={request.speed}") | |
try: | |
generator = pipeline( | |
text_reply, | |
voice=request.voice, | |
speed=request.speed, | |
split_pattern=r'\n+' | |
) | |
# Process only the first segment for demo | |
for i, (gs, ps, audio) in enumerate(generator): | |
logger.info(f"Audio generated successfully: segment {i}") | |
# Convert PyTorch tensor to NumPy array | |
audio_numpy = audio.cpu().numpy() | |
# Convert to 16-bit PCM | |
# Ensure the audio is in the range [-1, 1] | |
audio_numpy = np.clip(audio_numpy, -1, 1) | |
# Convert to 16-bit signed integers | |
pcm_data = (audio_numpy * 32767).astype(np.int16) | |
# Convert to bytes (automatically uses row-major order) | |
raw_audio = pcm_data.tobytes() | |
# Return PCM data with minimal necessary headers | |
return Response( | |
content=raw_audio, | |
media_type="application/octet-stream", | |
headers={ | |
"Content-Disposition": f'attachment; filename="output.pcm"', | |
"X-Sample-Rate": "24000", | |
"X-Bits-Per-Sample": "16", | |
"X-Endianness": "little" | |
} | |
) | |
logger.error("No audio segments generated") | |
return JSONResponse( | |
status_code=400, | |
content={"error": "No audio generated", "detail": "The pipeline did not produce any audio"} | |
) | |
except Exception as e: | |
logger.error(f"Error generating audio: {str(e)}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": "Audio generation failed", "detail": str(e)} | |
) | |
except Exception as e: | |
logger.error(f"Unexpected error in generate_audio endpoint: {str(e)}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": "Internal server error", "detail": str(e)} | |
) | |
async def root(): | |
return {"message": "Welcome to the Text-to-Speech API with Vision Support. Use POST /generate endpoint with 'text' and optionally 'image_base64' for queries."} | |
# Cleanup function to periodically remove old temporary images | |
async def startup_event(): | |
# You could add scheduled tasks here to clean up old images | |
pass | |
async def not_found_handler(request, exc): | |
return JSONResponse( | |
status_code=404, | |
content={"error": "Endpoint not found. Please use POST /generate for queries."} | |
) | |
async def method_not_allowed_handler(request, exc): | |
return JSONResponse( | |
status_code=405, | |
content={"error": "Method not allowed. Please check the API documentation."} | |
) |