Spaces:
Sleeping
Sleeping
# from fastapi import FastAPI, Response | |
# from fastapi.responses import FileResponse | |
# from kokoro import KPipeline | |
# import soundfile as sf | |
# import os | |
# import numpy as np | |
# import torch | |
# from huggingface_hub import InferenceClient | |
# def llm_chat_response(text): | |
# HF_TOKEN = os.getenv("HF_TOKEN") | |
# client = InferenceClient(api_key=HF_TOKEN) | |
# messages = [ | |
# { | |
# "role": "user", | |
# "content": [ | |
# { | |
# "type": "text", | |
# "text": text + str('describe in one line only') | |
# } #, | |
# # { | |
# # "type": "image_url", | |
# # "image_url": { | |
# # "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" | |
# # } | |
# # } | |
# ] | |
# } | |
# ] | |
# response_from_llama = client.chat.completions.create( | |
# model="meta-llama/Llama-3.2-11B-Vision-Instruct", | |
# messages=messages, | |
# max_tokens=500) | |
# return response_from_llama.choices[0].message['content'] | |
# app = FastAPI() | |
# # Initialize pipeline once at startup | |
# pipeline = KPipeline(lang_code='a') | |
# @app.post("/generate") | |
# async def generate_audio(text: str, voice: str = "af_heart", speed: float = 1.0): | |
# text_reply = llm_chat_response(text) | |
# # Generate audio | |
# generator = pipeline( | |
# text_reply, | |
# voice=voice, | |
# speed=speed, | |
# split_pattern=r'\n+' | |
# ) | |
# # # Save first segment only for demo | |
# # for i, (gs, ps, audio) in enumerate(generator): | |
# # sf.write(f"output_{i}.wav", audio, 24000) | |
# # return FileResponse( | |
# # f"output_{i}.wav", | |
# # media_type="audio/wav", | |
# # filename="output.wav" | |
# # ) | |
# # return Response("No audio generated", status_code=400) | |
# # Process only the first segment for demo | |
# for i, (gs, ps, audio) in enumerate(generator): | |
# # Convert PyTorch tensor to NumPy array | |
# audio_numpy = audio.cpu().numpy() | |
# # Convert to 16-bit PCM | |
# # Ensure the audio is in the range [-1, 1] | |
# audio_numpy = np.clip(audio_numpy, -1, 1) | |
# # Convert to 16-bit signed integers | |
# pcm_data = (audio_numpy * 32767).astype(np.int16) | |
# # Convert to bytes (automatically uses row-major order) | |
# raw_audio = pcm_data.tobytes() | |
# # Return PCM data with minimal necessary headers | |
# return Response( | |
# content=raw_audio, | |
# media_type="application/octet-stream", | |
# headers={ | |
# "Content-Disposition": f'attachment; filename="output.pcm"', | |
# "X-Sample-Rate": "24000", | |
# "X-Bits-Per-Sample": "16", | |
# "X-Endianness": "little" | |
# } | |
# ) | |
# return Response("No audio generated", status_code=400) | |
from fastapi import FastAPI, Response, HTTPException, Request | |
from fastapi.responses import JSONResponse | |
from fastapi.staticfiles import StaticFiles | |
from kokoro import KPipeline | |
import os | |
import numpy as np | |
import torch | |
from huggingface_hub import InferenceClient | |
from pydantic import BaseModel | |
import base64 | |
import logging | |
from typing import Optional, ClassVar, List | |
import uuid | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class TextImageRequest(BaseModel): | |
text: Optional[str] = None | |
image_base64: Optional[str] = None | |
voice: str = "af_heart" # Default voice that we know exists | |
speed: float = 1.0 | |
# Annotate as a ClassVar so Pydantic ignores it as a field. | |
AVAILABLE_VOICES: ClassVar[List[str]] = ["af_heart"] | |
def validate_voice(self): | |
if self.voice not in self.AVAILABLE_VOICES: | |
return "af_heart" | |
return self.voice | |
class AudioResponse(BaseModel): | |
status: str | |
message: str | |
class ErrorResponse(BaseModel): | |
error: str | |
detail: Optional[str] = None | |
# Initialize FastAPI app | |
app = FastAPI( | |
title="Text-to-Speech API with Vision Support", | |
description="API for generating speech from text with optional image analysis", | |
version="1.0.0" | |
) | |
# Create and mount static images directory so images are accessible via URL | |
STATIC_DIR = "static_images" | |
if not os.path.exists(STATIC_DIR): | |
os.makedirs(STATIC_DIR) | |
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
def llm_chat_response(text, image_base64=None): | |
"""Get responses from LLM with text and optionally an image input.""" | |
try: | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
logger.info("Checking HF_TOKEN...") | |
if not HF_TOKEN: | |
logger.error("HF_TOKEN not found in environment variables") | |
raise HTTPException(status_code=500, detail="HF_TOKEN not configured") | |
logger.info("Initializing InferenceClient...") | |
client = InferenceClient( | |
provider="hf-inference", # Using the correct provider as per sample | |
api_key=HF_TOKEN | |
) | |
if image_base64: | |
logger.info("Processing request with image") | |
# Save the base64 image to the static folder | |
filename = f"{uuid.uuid4()}.jpg" | |
image_path = os.path.join(STATIC_DIR, filename) | |
try: | |
image_data = base64.b64decode(image_base64) | |
except Exception as e: | |
logger.error(f"Error decoding base64 image: {str(e)}") | |
raise HTTPException(status_code=400, detail="Invalid base64 image data") | |
with open(image_path, "wb") as f: | |
f.write(image_data) | |
# Construct image URL (assumes BASE_URL environment variable or defaults to localhost) | |
base_url = os.getenv("BASE_URL", "http://localhost:8000") | |
image_url = f"{base_url}/static/{filename}" | |
prompt = text if text else "Describe this image in one sentence." | |
# Construct message exactly as in the reference | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": prompt}, | |
{"type": "image_url", "image_url": {"url": image_url}} | |
] | |
} | |
] | |
else: | |
logger.info("Processing text-only request") | |
messages = [ | |
{ | |
"role": "user", | |
"content": text + " Describe in one line only." | |
} | |
] | |
logger.info("Sending request to model...") | |
logger.info(f"Message structure: {messages}") | |
completion = client.chat.completions.create( | |
model="meta-llama/Llama-3.2-11B-Vision-Instruct", | |
messages=messages, | |
max_tokens=500 | |
) | |
logger.info("Received response from model") | |
logger.info(f"Model response received: {completion}") | |
try: | |
response = completion.choices[0].message.content | |
logger.info(f"Extracted response content: {response}") | |
return response | |
except Exception as e: | |
logger.error(f"Error extracting message content: {str(e)}") | |
try: | |
if hasattr(completion.choices[0], "message") and hasattr(completion.choices[0].message, "content"): | |
return completion.choices[0].message.content | |
return completion.choices[0]["message"]["content"] | |
except Exception as e2: | |
logger.error(f"All extraction methods failed: {str(e2)}") | |
return "I couldn't process that input. Please try again with a different query." | |
except Exception as e: | |
logger.error(f"Error in llm_chat_response: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
# Initialize the audio generation pipeline once at startup | |
try: | |
logger.info("Initializing KPipeline...") | |
pipeline = KPipeline(lang_code='a') | |
logger.info("KPipeline initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize KPipeline: {str(e)}") | |
# The app starts regardless but logs the error | |
async def generate_audio(request: TextImageRequest): | |
""" | |
Generate audio from text and optionally analyze an image. | |
- If text is provided, it is used as input. | |
- If an image is provided (base64), it is saved and a URL is generated for processing. | |
- The LLM response is then converted to speech. | |
""" | |
try: | |
logger.info("Received audio generation request") | |
user_text = request.text if request.text is not None else "" | |
if not user_text and request.image_base64: | |
user_text = "Describe what you see in the image" | |
elif not user_text and not request.image_base64: | |
logger.error("Neither text nor image provided in request") | |
return JSONResponse( | |
status_code=400, | |
content={"error": "Request must include either text or image_base64"} | |
) | |
logger.info("Getting LLM response...") | |
text_reply = llm_chat_response(user_text, request.image_base64) | |
logger.info(f"LLM response: {text_reply}") | |
validated_voice = request.validate_voice() | |
if validated_voice != request.voice: | |
logger.warning(f"Requested voice '{request.voice}' not available, using '{validated_voice}' instead") | |
logger.info(f"Generating audio using voice={validated_voice}, speed={request.speed}") | |
try: | |
generator = pipeline( | |
text_reply, | |
voice=validated_voice, | |
speed=request.speed, | |
split_pattern=r'\n+' | |
) | |
for i, (gs, ps, audio) in enumerate(generator): | |
logger.info(f"Audio generated successfully: segment {i}") | |
# Convert PyTorch tensor to NumPy array | |
audio_numpy = audio.cpu().numpy() | |
# Clip values to range [-1, 1] and convert to 16-bit PCM | |
audio_numpy = np.clip(audio_numpy, -1, 1) | |
pcm_data = (audio_numpy * 32767).astype(np.int16) | |
raw_audio = pcm_data.tobytes() | |
return Response( | |
content=raw_audio, | |
media_type="application/octet-stream", | |
headers={ | |
"Content-Disposition": 'attachment; filename="output.pcm"', | |
"X-Sample-Rate": "24000", | |
"X-Bits-Per-Sample": "16", | |
"X-Endianness": "little" | |
} | |
) | |
logger.error("No audio segments generated") | |
return JSONResponse( | |
status_code=400, | |
content={"error": "No audio generated", "detail": "The pipeline did not produce any audio"} | |
) | |
except Exception as e: | |
logger.error(f"Error generating audio: {str(e)}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": "Audio generation failed", "detail": str(e)} | |
) | |
except Exception as e: | |
logger.error(f"Unexpected error in generate_audio endpoint: {str(e)}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": "Internal server error", "detail": str(e)} | |
) | |
async def root(): | |
return {"message": "Welcome to the Text-to-Speech API with Vision Support. Use POST /generate with 'text' and optionally 'image_base64' for queries."} | |
async def not_found_handler(request: Request, exc): | |
return JSONResponse( | |
status_code=404, | |
content={"error": "Endpoint not found. Please use POST /generate for queries."} | |
) | |
async def method_not_allowed_handler(request: Request, exc): | |
return JSONResponse( | |
status_code=405, | |
content={"error": "Method not allowed. Please check the API documentation."} | |
) | |