File size: 6,949 Bytes
1190db4
 
 
 
 
 
5c85d81
 
1190db4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c85d81
1190db4
 
 
 
 
 
5c85d81
1190db4
 
 
 
 
 
 
 
 
 
 
 
5c85d81
1190db4
 
 
 
 
 
 
 
 
 
 
5c85d81
 
1190db4
5c85d81
1190db4
 
 
5c85d81
1190db4
 
 
 
 
 
 
 
 
 
 
5c85d81
1190db4
 
5c85d81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1190db4
 
 
 
 
 
 
 
 
5c85d81
1190db4
 
5c85d81
1190db4
 
 
5c85d81
1190db4
 
 
 
 
 
 
 
 
 
 
5c85d81
 
 
 
1190db4
 
5c85d81
1190db4
 
 
 
5c85d81
 
 
 
 
 
 
 
 
1190db4
5c85d81
 
 
 
 
 
 
 
 
 
1190db4
 
 
 
 
 
 
 
 
5c85d81
1190db4
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import asyncio
import os
import time
from openai import AsyncOpenAI, OpenAIError, RateLimitError
import httpx # For NSFW check

# Expanded list of voices based on recent OpenAI documentation
OPENAI_VOICES = ['alloy', 'echo', 'fable', 'onyx', 'nova', 'shimmer', 'ash', 'ballad', 'coral', 'sage', 'verse']

# Concurrency limiter
MAX_CONCURRENT_REQUESTS = 2
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)

# Retry mechanism
MAX_RETRIES = 3
INITIAL_BACKOFF_SECONDS = 1

async def is_content_safe(text: str, api_url_template: str | None) -> bool:
    """
    Checks if the content is safe using an external NSFW API.
    Returns True if safe or if API URL is not provided, False if unsafe.
    """
    if not api_url_template:
        return True 

    if "{text}" not in api_url_template:
        print("Warning: NSFW_API_URL_TEMPLATE does not contain {text} placeholder. Skipping NSFW check.")
        return True

    try:
        encoded_text = httpx.utils.quote(text) 
        url = api_url_template.format(text=encoded_text)
        
        async with httpx.AsyncClient() as client:
            response = await client.get(url, timeout=10.0)
        
        if response.status_code == 200:
            return True 
        else:
            print(f"NSFW Check: API request failed or content flagged. Status: {response.status_code}, Response: {response.text[:200]}")
            return False
    except httpx.RequestError as e:
        print(f"NSFW Check: API request error: {e}")
        return False 
    except Exception as e:
        print(f"NSFW Check: An unexpected error occurred: {e}")
        return False


async def synthesize_speech_line(
    client: AsyncOpenAI,
    text: str,
    voice: str,
    output_path: str,
    model: str = "tts-1-hd",
    speed: float = 1.0,
    instructions: str | None = None,
    nsfw_api_url_template: str | None = None,
    line_index: int = -1 
) -> str | None:
    """
    Synthesizes a single line of text to speech using OpenAI TTS.
    Includes speed and instructions parameters based on model compatibility.
    Retries on RateLimitError with exponential backoff.
    Returns the output_path if successful, None otherwise.
    """
    if nsfw_api_url_template:
        if not await is_content_safe(text, nsfw_api_url_template):
            print(f"Line {line_index if line_index != -1 else 'N/A'}: Content flagged as NSFW. Skipping synthesis.")
            return None

    current_retry = 0
    backoff_seconds = INITIAL_BACKOFF_SECONDS

    async with semaphore: 
        while current_retry < MAX_RETRIES:
            try:
                request_params = {
                    "model": model,
                    "voice": voice,
                    "input": text,
                    "response_format": "mp3" 
                }

                # Add speed if model supports it and speed is not default
                if model in ["tts-1", "tts-1-hd"]:
                    if speed is not None and speed != 1.0: # OpenAI default is 1.0
                        # Ensure speed is within valid range for safety, though UI should also constrain this
                        clamped_speed = max(0.25, min(speed, 4.0))
                        request_params["speed"] = clamped_speed
                
                # Add instructions if model supports it and instructions are provided
                # Assuming gpt-4o-mini-tts supports it, and tts-1/tts-1-hd do not.
                if model not in ["tts-1", "tts-1-hd"] and instructions: # Example: gpt-4o-mini-tts
                    request_params["instructions"] = instructions

                response = await client.audio.speech.create(**request_params)
                await response.astream_to_file(output_path)
                return output_path
            except RateLimitError as e:
                current_retry += 1
                if current_retry >= MAX_RETRIES:
                    print(f"Line {line_index if line_index != -1 else ''}: Max retries reached for RateLimitError. Error: {e}")
                    return None
                print(f"Line {line_index if line_index != -1 else ''}: Rate limit hit. Retrying in {backoff_seconds}s... (Attempt {current_retry}/{MAX_RETRIES})")
                await asyncio.sleep(backoff_seconds)
                backoff_seconds *= 2  
            except OpenAIError as e:
                print(f"Line {line_index if line_index != -1 else ''}: OpenAI API error: {e}")
                return None 
            except Exception as e:
                print(f"Line {line_index if line_index != -1 else ''}: An unexpected error occurred during synthesis: {e}")
                return None
    return None 

if __name__ == '__main__':
    async def main_test():
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            print("OPENAI_API_KEY not set. Skipping test.")
            return

        client = AsyncOpenAI(api_key=api_key)
        
        test_lines = [
            {"id": 0, "speaker": "Alice", "text": "Hello, this is a test line for Alice, spoken quickly."},
            {"id": 1, "speaker": "Bob", "text": "And this is Bob, testing his voice with instructions.", "instructions": "Speak in a deep, resonant voice."},
            {"id": 2, "speaker": "Alice", "text": "A short reply, spoken slowly.", "speed": 0.8},
            {"id": 3, "speaker": "Charlie", "text": "Charlie here, normal speed."}
        ]

        temp_dir = "test_audio_output_enhanced"
        os.makedirs(temp_dir, exist_ok=True)

        tasks = []
        for i, line_data in enumerate(test_lines):
            # Test with specific models to check param compatibility
            # For Alice (speed): tts-1-hd. For Bob (instructions): gpt-4o-mini-tts
            current_model = "tts-1-hd"
            if "instructions" in line_data:
                current_model = "gpt-4o-mini-tts" # Example, ensure this model is available for your key

            voice = OPENAI_VOICES[i % len(OPENAI_VOICES)] 
            output_file = os.path.join(temp_dir, f"line_{line_data['id']}_{current_model}.mp3")
            
            tasks.append(
                synthesize_speech_line(
                    client, 
                    line_data["text"], 
                    voice, 
                    output_file, 
                    model=current_model,
                    speed=line_data.get("speed", 1.0),
                    instructions=line_data.get("instructions"),
                    line_index=line_data['id']
                )
            )
        
        results = await asyncio.gather(*tasks)
        
        successful_files = [r for r in results if r]
        print(f"\nSuccessfully synthesized {len(successful_files)} out of {len(test_lines)} lines.")
        for f_path in successful_files:
            print(f" - {f_path}")

    if os.name == 'nt': 
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(main_test())