# from fastapi import FastAPI, Response # from fastapi.responses import FileResponse # from kokoro import KPipeline # import soundfile as sf # import os # import numpy as np # import torch # from huggingface_hub import InferenceClient # def llm_chat_response(text): # HF_TOKEN = os.getenv("HF_TOKEN") # client = InferenceClient(api_key=HF_TOKEN) # messages = [ # { # "role": "user", # "content": [ # { # "type": "text", # "text": text + str('describe in one line only') # } #, # # { # # "type": "image_url", # # "image_url": { # # "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" # # } # # } # ] # } # ] # response_from_llama = client.chat.completions.create( # model="meta-llama/Llama-3.2-11B-Vision-Instruct", # messages=messages, # max_tokens=500) # return response_from_llama.choices[0].message['content'] # app = FastAPI() # # Initialize pipeline once at startup # pipeline = KPipeline(lang_code='a') # @app.post("/generate") # async def generate_audio(text: str, voice: str = "af_heart", speed: float = 1.0): # text_reply = llm_chat_response(text) # # Generate audio # generator = pipeline( # text_reply, # voice=voice, # speed=speed, # split_pattern=r'\n+' # ) # # # Save first segment only for demo # # for i, (gs, ps, audio) in enumerate(generator): # # sf.write(f"output_{i}.wav", audio, 24000) # # return FileResponse( # # f"output_{i}.wav", # # media_type="audio/wav", # # filename="output.wav" # # ) # # return Response("No audio generated", status_code=400) # # Process only the first segment for demo # for i, (gs, ps, audio) in enumerate(generator): # # Convert PyTorch tensor to NumPy array # audio_numpy = audio.cpu().numpy() # # Convert to 16-bit PCM # # Ensure the audio is in the range [-1, 1] # audio_numpy = np.clip(audio_numpy, -1, 1) # # Convert to 16-bit signed integers # pcm_data = (audio_numpy * 32767).astype(np.int16) # # Convert to bytes (automatically uses row-major order) # raw_audio = pcm_data.tobytes() # # Return PCM data with minimal necessary headers # return Response( # content=raw_audio, # media_type="application/octet-stream", # headers={ # "Content-Disposition": f'attachment; filename="output.pcm"', # "X-Sample-Rate": "24000", # "X-Bits-Per-Sample": "16", # "X-Endianness": "little" # } # ) # return Response("No audio generated", status_code=400) from fastapi import FastAPI, Response, HTTPException from fastapi.responses import FileResponse, JSONResponse from kokoro import KPipeline import soundfile as sf import os import numpy as np import torch from huggingface_hub import InferenceClient from pydantic import BaseModel import base64 from io import BytesIO from PIL import Image import logging from typing import Optional import uuid import pathlib # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Create a directory for temporary image storage TEMP_DIR = pathlib.Path("./temp_images") TEMP_DIR.mkdir(exist_ok=True) class TextImageRequest(BaseModel): text: Optional[str] = None image_base64: Optional[str] = None voice: str = "af_heart" speed: float = 1.0 class AudioResponse(BaseModel): status: str message: str # Initialize FastAPI app app = FastAPI( title="Text-to-Speech API with Vision Support", description="API for generating speech from text with optional image analysis", version="1.0.0" ) def save_base64_image(image_base64): """Save base64 image to a temporary file and return the file path""" try: # Generate a unique filename filename = f"{uuid.uuid4()}.jpg" filepath = TEMP_DIR / filename # Decode and save the image image_data = base64.b64decode(image_base64) with open(filepath, "wb") as f: f.write(image_data) # Return the file URL (using file:// protocol) return f"file://{filepath.absolute()}" except Exception as e: logger.error(f"Error saving base64 image: {str(e)}") raise HTTPException(status_code=400, detail=f"Invalid base64 image data: {str(e)}") def llm_chat_response(text, image_base64=None): """Function to get responses from LLM with text and optionally image input.""" try: HF_TOKEN = os.getenv("HF_TOKEN") logger.info("Checking HF_TOKEN...") if not HF_TOKEN: logger.error("HF_TOKEN not found in environment variables") raise HTTPException(status_code=500, detail="HF_TOKEN not configured") logger.info("Initializing InferenceClient...") client = InferenceClient( provider="cerebras", # Using cerebras as in this example api_key=HF_TOKEN ) # Build the messages payload using the format from your working example message_content = [{ "type": "text", "text": text + ("" if image_base64 else " describe in one line only") }] if image_base64: logger.info("Processing base64 image...") # Save the base64 image to a file and get the file URL image_url = save_base64_image(image_base64) logger.info(f"Image saved at: {image_url}") # Create data URI data_uri = f"data:image/jpeg;base64,{image_base64}" # Add image to message content message_content.append({ "type": "image_url", "image_url": {"url": data_uri} }) # Construct the messages array exactly as in your working example messages = [{ "role": "user", "content": message_content }] logger.info("Sending request to model...") try: completion = client.chat.completions.create( model="meta-llama/Llama-4-Scout-17B-16E-Instruct", messages=messages, max_tokens=500 ) except Exception as http_err: # Log HTTP errors from the request logger.error(f"HTTP error occurred: {str(http_err)}") raise HTTPException(status_code=500, detail=str(http_err)) logger.info(f"Raw model response received") # Extract the response using the same method as your working code if not completion.choices or len(completion.choices) == 0: logger.error("No choices returned from model.") raise HTTPException(status_code=500, detail="Model returned no choices.") # Extract the response message from the first choice choice = completion.choices[0] response_message = None if hasattr(choice, "message"): response_message = choice.message elif isinstance(choice, dict): response_message = choice.get("message") if not response_message: logger.error(f"Response message is empty: {choice}") raise HTTPException(status_code=500, detail="Model response did not include a message.") content = None if isinstance(response_message, dict): content = response_message.get("content") if content is None and hasattr(response_message, "content"): content = response_message.content if not content: logger.error(f"Message content is missing: {response_message}") raise HTTPException(status_code=500, detail="Model message did not include content.") return content except Exception as e: logger.error(f"Error in llm_chat_response: {str(e)}") # Fallback response in case of error return "I couldn't process that input. Please try again with a different image or text query." # Initialize pipeline once at startup try: logger.info("Initializing KPipeline...") pipeline = KPipeline(lang_code='a') logger.info("KPipeline initialized successfully") except Exception as e: logger.error(f"Failed to initialize KPipeline: {str(e)}") # We'll let the app start anyway, but log the error @app.post("/generate") async def generate_audio(request: TextImageRequest): """ Generate audio from text and optionally analyze an image. - If text is provided, uses that as input - If image is provided, analyzes the image - Converts the LLM response to speech using the specified voice and speed """ try: logger.info(f"Received audio generation request") # If no text is provided but image is provided, use default prompt user_text = request.text if request.text is not None else "" if not user_text and request.image_base64: user_text = "Describe what you see in the image" elif not user_text and not request.image_base64: logger.error("Neither text nor image provided in request") return JSONResponse( status_code=400, content={"error": "Request must include either text or image_base64"} ) # Generate response using text and image if provided logger.info("Getting LLM response...") text_reply = llm_chat_response(user_text, request.image_base64) logger.info(f"LLM response: {text_reply}") # Generate audio logger.info(f"Generating audio using voice={request.voice}, speed={request.speed}") try: generator = pipeline( text_reply, voice=request.voice, speed=request.speed, split_pattern=r'\n+' ) # Process only the first segment for demo for i, (gs, ps, audio) in enumerate(generator): logger.info(f"Audio generated successfully: segment {i}") # Convert PyTorch tensor to NumPy array audio_numpy = audio.cpu().numpy() # Convert to 16-bit PCM # Ensure the audio is in the range [-1, 1] audio_numpy = np.clip(audio_numpy, -1, 1) # Convert to 16-bit signed integers pcm_data = (audio_numpy * 32767).astype(np.int16) # Convert to bytes (automatically uses row-major order) raw_audio = pcm_data.tobytes() # Return PCM data with minimal necessary headers return Response( content=raw_audio, media_type="application/octet-stream", headers={ "Content-Disposition": f'attachment; filename="output.pcm"', "X-Sample-Rate": "24000", "X-Bits-Per-Sample": "16", "X-Endianness": "little" } ) logger.error("No audio segments generated") return JSONResponse( status_code=400, content={"error": "No audio generated", "detail": "The pipeline did not produce any audio"} ) except Exception as e: logger.error(f"Error generating audio: {str(e)}") return JSONResponse( status_code=500, content={"error": "Audio generation failed", "detail": str(e)} ) except Exception as e: logger.error(f"Unexpected error in generate_audio endpoint: {str(e)}") return JSONResponse( status_code=500, content={"error": "Internal server error", "detail": str(e)} ) @app.get("/") async def root(): return {"message": "Welcome to the Text-to-Speech API with Vision Support. Use POST /generate endpoint with 'text' and optionally 'image_base64' for queries."} # Cleanup function to periodically remove old temporary images @app.on_event("startup") async def startup_event(): # You could add scheduled tasks here to clean up old images pass @app.exception_handler(404) async def not_found_handler(request, exc): return JSONResponse( status_code=404, content={"error": "Endpoint not found. Please use POST /generate for queries."} ) @app.exception_handler(405) async def method_not_allowed_handler(request, exc): return JSONResponse( status_code=405, content={"error": "Method not allowed. Please check the API documentation."} )