"
# Slide tool for generating HTML slides used by slide_agent
def create_slides(slides: list[dict], title: str, instructor_name: str, output_dir: str = OUTPUT_DIR) -> list[str]:
try:
html_files = []
template_file = os.path.join(os.getcwd(), "slide_template.html")
with open(template_file, "r", encoding="utf-8") as f:
template_content = f.read()
for i, slide in enumerate(slides):
slide_number = i + 1
md_content = slide['content']
html_content = render_md_to_html(md_content)
date = datetime.datetime.now().strftime("%Y-%m-%d")
# Replace placeholders in the template
slide_html = template_content.replace("", str(slide_number))
slide_html = slide_html.replace("section title", f"{slide['title']}")
slide_html = slide_html.replace("Lecture title", title)
slide_html = slide_html.replace("", html_content)
slide_html = slide_html.replace("speaker name", instructor_name)
slide_html = slide_html.replace("date", date)
html_file = os.path.join(output_dir, f"slide_{slide_number}.html")
with open(html_file, "w", encoding="utf-8") as f:
f.write(slide_html)
logger.info("Generated HTML slide: %s", html_file)
html_files.append(html_file)
# Save slide content as Markdown files
for i, slide in enumerate(slides):
slide_number = i + 1
md_file = os.path.join(output_dir, f"slide_{slide_number}_content.md")
with open(md_file, "w", encoding="utf-8") as f:
f.write(slide['content'])
logger.info("Saved slide content to Markdown: %s", md_file)
return html_files
except Exception as e:
logger.error("Failed to create HTML slides: %s", str(e))
return []
# Dynamic progress bar
def html_with_progress(label, progress):
return f"""
{label}
"""
# Get model client based on selected service
def get_model_client(service, api_key):
if service == "OpenAI-gpt-4o-2024-08-06":
return OpenAIChatCompletionClient(model="gpt-4o-2024-08-06", api_key=api_key)
elif service == "Anthropic-claude-3-sonnet-20240229":
return AnthropicChatCompletionClient(model="claude-3-sonnet-20240229", api_key=api_key)
elif service == "Google-gemini-2.0-flash":
return OpenAIChatCompletionClient(model="gemini-2.0-flash", api_key=api_key)
elif service == "Ollama-llama3.2":
return OllamaChatCompletionClient(model="llama3.2")
elif service == "Azure AI Foundry":
return AzureAIChatCompletionClient(
model="phi-4",
endpoint="https://models.inference.ai.azure.com",
credential=AzureKeyCredential(os.environ.get("GITHUB_TOKEN", "")),
model_info={
"json_output": False,
"function_calling": False,
"vision": False,
"family": "unknown",
"structured_output": False,
}
)
else:
raise ValueError("Invalid service")
# Helper function to clean script text
def clean_script_text(script):
if not script or not isinstance(script, str):
logger.error("Invalid script input: %s", script)
return None
script = re.sub(r"\*\*Slide \d+:.*?\*\*", "", script)
script = re.sub(r"\[.*?\]", "", script)
script = re.sub(r"Title:.*?\n|Content:.*?\n", "", script)
script = script.replace("humanlike", "human-like").replace("problemsolving", "problem-solving")
script = re.sub(r"\s+", " ", script).strip()
if len(script) < 10:
logger.error("Cleaned script too short (%d characters): %s", len(script), script)
return None
logger.info("Cleaned script: %s", script)
return script
# Helper to validate and convert speaker audio
async def validate_and_convert_speaker_audio(speaker_audio):
if not speaker_audio or not os.path.exists(speaker_audio):
logger.warning("Speaker audio file does not exist: %s. Using default voice.", speaker_audio)
default_voice = os.path.join(os.path.dirname(__file__), "professor_lectura_male.mp3")
if os.path.exists(default_voice):
speaker_audio = default_voice
else:
logger.error("Default voice not found. Cannot proceed with TTS.")
return None
try:
ext = os.path.splitext(speaker_audio)[1].lower()
if ext == ".mp3":
logger.info("Converting MP3 to WAV: %s", speaker_audio)
audio = AudioSegment.from_mp3(speaker_audio)
audio = audio.set_channels(1).set_frame_rate(22050)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False, dir=OUTPUT_DIR) as temp_file:
audio.export(temp_file.name, format="wav")
speaker_wav = temp_file.name
elif ext == ".wav":
speaker_wav = speaker_audio
else:
logger.error("Unsupported audio format: %s", ext)
return None
data, samplerate = sf.read(speaker_wav)
if samplerate < 16000 or samplerate > 48000:
logger.error("Invalid sample rate for %s: %d Hz", speaker_wav, samplerate)
return None
if len(data) < 16000:
logger.error("Speaker audio too short: %d frames", len(data))
return None
if data.ndim == 2:
logger.info("Converting stereo WAV to mono: %s", speaker_wav)
data = data.mean(axis=1)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False, dir=OUTPUT_DIR) as temp_file:
sf.write(temp_file.name, data, samplerate)
speaker_wav = temp_file.name
logger.info("Validated speaker audio: %s", speaker_wav)
return speaker_wav
except Exception as e:
logger.error("Failed to validate or convert speaker audio %s: %s", speaker_audio, str(e))
return None
# Helper function to generate audio using Coqui TTS API
def generate_xtts_audio(tts, text, speaker_wav, output_path):
if not tts:
logger.error("TTS model not initialized")
return False
try:
tts.tts_to_file(text=text, speaker_wav=speaker_wav, language="en", file_path=output_path)
logger.info("Generated audio for %s", output_path)
return True
except Exception as e:
logger.error("Failed to generate audio for %s: %s", output_path, str(e))
return False
# Helper function to extract JSON from messages
def extract_json_from_message(message):
if isinstance(message, TextMessage):
content = message.content
logger.debug("Extracting JSON from TextMessage: %s", content)
if not isinstance(content, str):
logger.warning("TextMessage content is not a string: %s", content)
return None
pattern = r"```json\s*(.*?)\s*```"
match = re.search(pattern, content, re.DOTALL)
if match:
try:
json_str = match.group(1).strip()
logger.debug("Found JSON in code block: %s", json_str)
return json.loads(json_str)
except json.JSONDecodeError as e:
logger.error("Failed to parse JSON from code block: %s", e)
json_patterns = [
r"\[\s*\{.*?\}\s*\]",
r"\{\s*\".*?\"\s*:.*?\}",
]
for pattern in json_patterns:
match = re.search(pattern, content, re.DOTALL)
if match:
try:
json_str = match.group(0).strip()
logger.debug("Found JSON with pattern %s: %s", pattern, json_str)
return json.loads(json_str)
except json.JSONDecodeError as e:
logger.error("Failed to parse JSON with pattern %s: %s", pattern, e)
try:
for i in range(len(content)):
for j in range(len(content), i, -1):
substring = content[i:j].strip()
if (substring.startswith('{') and substring.endswith('}')) or \
(substring.startswith('[') and substring.endswith(']')):
try:
parsed = json.loads(substring)
if isinstance(parsed, (list, dict)):
logger.info("Found JSON in substring: %s", substring)
return parsed
except json.JSONDecodeError:
continue
except Exception as e:
logger.error("Error in JSON substring search: %s", e)
logger.warning("No JSON found in TextMessage content")
return None
elif isinstance(message, StructuredMessage):
content = message.content
logger.debug("Extracting JSON from StructuredMessage: %s", content)
try:
if isinstance(content, BaseModel):
content_dict = content.dict()
return content_dict.get("slides", content_dict)
return content
except Exception as e:
logger.error("Failed to extract JSON from StructuredMessage: %s, Content: %s", e, content)
return None
elif isinstance(message, HandoffMessage):
logger.debug("Extracting JSON from HandoffMessage context")
for ctx_msg in message.context:
if hasattr(ctx_msg, "content"):
content = ctx_msg.content
logger.debug("HandoffMessage context content: %s", content)
if isinstance(content, str):
pattern = r"```json\s*(.*?)\s*```"
match = re.search(pattern, content, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError as e:
logger.error("Failed to parse JSON from HandoffMessage: %s", e)
json_patterns = [
r"\[\s*\{.*?\}\s*\]",
r"\{\s*\".*?\"\s*:.*?\}",
]
for pattern in json_patterns:
match = re.search(pattern, content, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError as e:
logger.error("Failed to parse JSON with pattern %s: %s", pattern, e)
elif isinstance(content, dict):
return content.get("slides", content)
logger.warning("No JSON found in HandoffMessage context")
return None
logger.warning("Unsupported message type for JSON extraction: %s", type(message))
return None
# Async update audio preview
async def update_audio_preview(audio_file):
if audio_file:
logger.info("Updating audio preview for file: %s", audio_file)
return audio_file
return None
# Create a zip file of .md, .txt, and .mp3 files
def create_zip_of_files(file_paths):
zip_path = os.path.join(OUTPUT_DIR, "all_lecture_materials.zip")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in file_paths:
if os.path.exists(file_path):
_, ext = os.path.splitext(file_path)
if ext in ['.md', '.txt', '.mp3']:
zipf.write(file_path, os.path.basename(file_path))
logger.info("Added %s to zip", file_path)
logger.info("Created zip file: %s", zip_path)
return zip_path
# Access local files
def get_gradio_file_url(local_path):
relative_path = os.path.relpath(local_path, os.getcwd())
return f"/gradio_api/file={relative_path}"
# Async generate lecture materials and audio
async def on_generate(api_service, api_key, serpapi_key, title, lecture_content_description, lecture_type, lecture_style, speaker_audio, num_slides):
print(f"Received serpapi_key: '{serpapi_key}' (type: {type(serpapi_key)}, length: {len(serpapi_key) if serpapi_key else 0})")
model_client = get_model_client(api_service, api_key)
# Get the speaker from the speaker_audio path
speaker = os.path.basename(speaker_audio) if speaker_audio else "professor_lectura_male.mp3"
logger.info(f"Selected speaker file: {speaker}")
instructor_name = get_instructor_name(speaker)
logger.info(f"Using instructor: {instructor_name}")
if os.path.exists(OUTPUT_DIR):
try:
for item in os.listdir(OUTPUT_DIR):
item_path = os.path.join(OUTPUT_DIR, item)
if os.path.isfile(item_path):
os.unlink(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
logger.info("Cleared outputs directory: %s", OUTPUT_DIR)
except Exception as e:
logger.error("Failed to clear outputs directory: %s", str(e))
else:
os.makedirs(OUTPUT_DIR, exist_ok=True)
logger.info("Created outputs directory: %s", OUTPUT_DIR)
# Total slides include user-specified content slides plus Introduction and Closing slides
content_slides = num_slides
total_slides = content_slides + 2
date = datetime.datetime.now().strftime("%Y-%m-%d")
research_agent = AssistantAgent(
name="research_agent",
model_client=model_client,
handoffs=["slide_agent"],
system_message="You are a Research Agent. Use the search_web tool to gather information on the topic and keywords from the initial message. Summarize the findings concisely in a single message, then use the handoff_to_slide_agent tool to pass the task to the Slide Agent. Do not produce any other output.",
tools=[create_search_web_with_key(serpapi_key)]
)
slide_agent = AssistantAgent(
name="slide_agent",
model_client=model_client,
handoffs=["script_agent"],
system_message=f"""
You are a Slide Agent. Using the research from the conversation history and the specified number of content slides ({content_slides}), generate exactly {content_slides} content slides, plus an Introduction slide as the first slide and a Closing slide as the last slide, making a total of {total_slides} slides.
- The Introduction slide (first slide) should have the title "{title}" and content containing only the lecture title, speaker name ({get_instructor_name(speaker_audio)}), and date {date}, centered, in plain text.
- The Closing slide (last slide) should have the title "Closing" and content containing only "The End\nThank you", centered, in plain text.
- The remaining {content_slides} slides should be content slides based on the lecture description, audience type, and lecture style ({lecture_style}), with meaningful titles and content in valid Markdown format. Adapt the content to the lecture style to suit diverse learners:
- Feynman: Explains complex ideas with simplicity, clarity, and enthusiasm, emulating Richard Feynman's teaching style.
- Socratic: Poses thought-provoking questions to guide learners to insights without requiring direct interaction.
- Humorous: Infuses wit and light-hearted anecdotes to make content engaging and memorable.
- Inspirational - Motivating: Uses motivational language and visionary ideas to spark enthusiasm and curiosity.
- Reflective: Encourages introspection with a calm, contemplative tone to deepen understanding.
Output ONLY a JSON array wrapped in ```json ... ``` in a TextMessage, where each slide is an object with 'title' and 'content' keys. After generating the JSON, use the create_slides tool to produce HTML slides, then use the handoff_to_script_agent tool to pass the task to the Script Agent. Do not include any explanatory text or other messages.
Example output for 1 content slide (total 3 slides):
```json
[
{{"title": "Introduction to AI Basics", "content": "AI Basics\n{get_instructor_name(speaker_audio)}\n{date}"}},
{{"title": "What is AI?", "content": "# What is AI?\n- Definition: Systems that mimic human intelligence\n- Key areas: ML, NLP, Robotics"}},
{{"title": "Closing", "content": "The End\nThank you"}}
]
```""",
tools=[create_slides],
output_content_type=None,
reflect_on_tool_use=False
)
script_agent = AssistantAgent(
name="script_agent",
model_client=model_client,
handoffs=["instructor_agent"],
system_message=f"""
You are a Script Agent. Access the JSON array of {total_slides} slides from the conversation history, which includes an Introduction slide, {content_slides} content slides, and a Closing slide. Generate a narration script (1-2 sentences) for each of the {total_slides} slides, summarizing its content in a clear, academically inclined tone. Ensure the lecture is engaging, covers the fundamental requirements of the topic, and aligns with the lecture style ({lecture_style}) to suit diverse learners. The lecture will be delivered by {instructor_name}.
Output ONLY a JSON array wrapped in ```json ... ``` with exactly {total_slides} strings, one script per slide, in the same order. Ensure the JSON is valid and complete. After outputting, use the handoff_to_instructor_agent tool. If scripts cannot be generated, retry once.
Example for 3 slides (1 content slide):
```json
[
"Welcome to the lecture on AI Basics. I am {instructor_name}, and today we will explore the fundamentals of artificial intelligence.",
"Let us begin by defining artificial intelligence: it refers to systems that mimic human intelligence, spanning key areas such as machine learning, natural language processing, and robotics.",
"That concludes our lecture on AI Basics. Thank you for your attention, and I hope you found this session insightful."
]
```""",
output_content_type=None,
reflect_on_tool_use=False
)
def get_instructor_prompt(speaker, lecture_style):
base_prompts = {
"feynman.mp3": f"You are {instructor_name}, known for your ability to explain complex concepts with remarkable clarity and enthusiasm. Your teaching style is characterized by:",
"einstein.mp3": f"You are {instructor_name}, known for your profound insights and ability to connect abstract concepts to the physical world. Your teaching style is characterized by:",
"samantha.mp3": f"You are {instructor_name}, known for your engaging and accessible approach to teaching. Your teaching style is characterized by:",
"socrates.mp3": f"You are {instructor_name}, known for your method of questioning and guiding students to discover knowledge themselves. Your teaching style is characterized by:",
"professor_lectura_male.mp3": f"You are {instructor_name}, known for your clear and authoritative teaching style. Your teaching style is characterized by:"
}
style_characteristics = {
"Feynman - Simplifies complex ideas with enthusiasm": """
- Breaking down complex ideas into simple, understandable parts
- Using analogies and real-world examples
- Maintaining enthusiasm and curiosity throughout
- Encouraging critical thinking and questioning
- Making abstract concepts tangible and relatable""",
"Socratic - Guides insights with probing questions": """
- Using thought-provoking questions to guide understanding
- Encouraging self-discovery and critical thinking
- Challenging assumptions and exploring implications
- Building knowledge through dialogue and inquiry
- Fostering intellectual curiosity and reflection""",
"Inspirational - Sparks enthusiasm with visionary ideas": """
- Connecting concepts to broader implications and possibilities
- Using motivational language and visionary thinking
- Inspiring curiosity and wonder about the subject
- Highlighting the transformative potential of knowledge
- Encouraging students to think beyond conventional boundaries""",
"Reflective - Promotes introspection with a calm tone": """
- Creating a contemplative learning environment
- Encouraging deep thinking and personal connection
- Using a calm, measured delivery
- Promoting self-reflection and understanding
- Building connections between concepts and personal experience""",
"Humorous - Uses wit and anecdotes for engaging content": """
- Incorporating relevant humor and anecdotes
- Making learning enjoyable and memorable
- Using wit to highlight key concepts
- Creating an engaging and relaxed atmosphere
- Balancing entertainment with educational value"""
}
base_prompt = base_prompts.get(speaker, base_prompts["feynman.mp3"])
style_prompt = style_characteristics.get(lecture_style, style_characteristics["Feynman - Simplifies complex ideas with enthusiasm"])
return f"""{base_prompt}
{style_prompt}
Review the slides and scripts from the conversation history to ensure coherence, completeness, and that exactly {total_slides} slides and {total_slides} scripts are received, including the Introduction and Closing slides. Verify that HTML slide files exist in the outputs directory and align with the lecture style ({lecture_style}). Output a confirmation message summarizing the number of slides, scripts, and HTML files status. If slides, scripts, or HTML files are missing, invalid, or do not match the expected count ({total_slides}), report the issue clearly. Use 'TERMINATE' to signal completion.
Example: 'Received {total_slides} slides, {total_slides} scripts, and HTML files. Lecture is coherent and aligns with {lecture_style} style. TERMINATE'
"""
instructor_agent = AssistantAgent(
name="instructor_agent",
model_client=model_client,
handoffs=[],
system_message=get_instructor_prompt(speaker_audio, lecture_style)
)
swarm = Swarm(
participants=[research_agent, slide_agent, script_agent, instructor_agent],
termination_condition=HandoffTermination(target="user") | TextMentionTermination("TERMINATE")
)
progress = 0
label = "Researching lecture topic..."
yield (
html_with_progress(label, progress),
[]
)
await asyncio.sleep(0.1)
initial_message = f"""
Lecture Title: {title}
Lecture Content Description: {lecture_content_description}
Audience: {lecture_type}
Lecture Style: {lecture_style}
Number of Content Slides: {content_slides}
Please start by researching the topic, or proceed without research if search is unavailable.
"""
logger.info("Starting lecture generation for title: %s with %d content slides (total %d slides), style: %s", title, content_slides, total_slides, lecture_style)
slides = None
scripts = None
html_files = []
error_html = """
Failed to generate lecture materials
Please try again with different parameters or a different model.
"""
try:
logger.info("Research Agent starting...")
if serpapi_key:
task_result = await Console(swarm.run_stream(task=initial_message))
else:
logger.warning("No SerpApi key provided, bypassing research phase")
task_result = await Console(swarm.run_stream(task=f"{initial_message}\nNo search available, proceed with slide generation."))
logger.info("Swarm execution completed")
slide_retry_count = 0
script_retry_count = 0
max_retries = 2
for message in task_result.messages:
source = getattr(message, 'source', getattr(message, 'sender', None))
logger.debug("Processing message from %s, type: %s", source, type(message))
if isinstance(message, HandoffMessage):
logger.info("Handoff from %s to %s", source, message.target)
if source == "research_agent" and message.target == "slide_agent":
progress = 25
label = "Slides: generating..."
yield (
html_with_progress(label, progress),
[]
)
await asyncio.sleep(0.1)
elif source == "slide_agent" and message.target == "script_agent":
if slides is None:
logger.warning("Slide Agent handoff without slides JSON")
extracted_json = extract_json_from_message(message)
if extracted_json:
slides = extracted_json
logger.info("Extracted slides JSON from HandoffMessage context: %s", slides)
if slides is None or len(slides) != total_slides:
if slide_retry_count < max_retries:
slide_retry_count += 1
logger.info("Retrying slide generation (attempt %d/%d)", slide_retry_count, max_retries)
retry_message = TextMessage(
content=f"Please generate exactly {total_slides} slides (Introduction, {content_slides} content slides, and Closing) as per your instructions.",
source="user",
recipient="slide_agent"
)
task_result.messages.append(retry_message)
continue
progress = 50
label = "Scripts: generating..."
yield (
html_with_progress(label, progress),
[]
)
await asyncio.sleep(0.1)
elif source == "script_agent" and message.target == "instructor_agent":
if scripts is None:
logger.warning("Script Agent handoff without scripts JSON")
extracted_json = extract_json_from_message(message)
if extracted_json:
scripts = extracted_json
logger.info("Extracted scripts JSON from HandoffMessage context: %s", scripts)
progress = 75
label = "Review: in progress..."
yield (
html_with_progress(label, progress),
[]
)
await asyncio.sleep(0.1)
elif source == "research_agent" and isinstance(message, TextMessage) and "handoff_to_slide_agent" in message.content:
logger.info("Research Agent completed research")
progress = 25
label = "Slides: generating..."
yield (
html_with_progress(label, progress),
[]
)
await asyncio.sleep(0.1)
elif source == "slide_agent" and isinstance(message, (TextMessage, StructuredMessage)):
logger.debug("Slide Agent message received")
extracted_json = extract_json_from_message(message)
if extracted_json:
slides = extracted_json
logger.info("Slide Agent generated %d slides: %s", len(slides), slides)
if len(slides) != total_slides:
if slide_retry_count < max_retries:
slide_retry_count += 1
logger.info("Retrying slide generation (attempt %d/%d)", slide_retry_count, max_retries)
retry_message = TextMessage(
content=f"Please generate exactly {total_slides} slides (Introduction, {content_slides} content slides, and Closing) as per your instructions.",
source="user",
recipient="slide_agent"
)
task_result.messages.append(retry_message)
continue
# Generate HTML slides with instructor name
html_files = create_slides(slides, title, instructor_name)
if not html_files:
logger.error("Failed to generate HTML slides")
progress = 50
label = "Scripts: generating..."
yield (
html_with_progress(label, progress),
[]
)
await asyncio.sleep(0.1)
else:
logger.warning("No JSON extracted from slide_agent message")
if slide_retry_count < max_retries:
slide_retry_count += 1
logger.info("Retrying slide generation (attempt %d/%d)", slide_retry_count, max_retries)
retry_message = TextMessage(
content=f"Please generate exactly {total_slides} slides (Introduction, {content_slides} content slides, and Closing) as per your instructions.",
source="user",
recipient="slide_agent"
)
task_result.messages.append(retry_message)
continue
elif source == "script_agent" and isinstance(message, (TextMessage, StructuredMessage)):
logger.debug("Script Agent message received")
extracted_json = extract_json_from_message(message)
if extracted_json:
scripts = extracted_json
logger.info("Script Agent generated scripts for %d slides: %s", len(scripts), scripts)
for i, script in enumerate(scripts):
script_file = os.path.join(OUTPUT_DIR, f"slide_{i+1}_script.txt")
try:
with open(script_file, "w", encoding="utf-8") as f:
f.write(script)
logger.info("Saved script to %s", script_file)
except Exception as e:
logger.error("Error saving script to %s: %s", script_file, str(e))
progress = 75
label = "Scripts generated and saved. Reviewing..."
yield (
html_with_progress(label, progress),
[]
)
await asyncio.sleep(0.1)
else:
logger.warning("No JSON extracted from script_agent message")
if script_retry_count < max_retries:
script_retry_count += 1
logger.info("Retrying script generation (attempt %d/%d)", script_retry_count, max_retries)
retry_message = TextMessage(
content=f"Please generate exactly {total_slides} scripts for the {total_slides} slides as per your instructions.",
source="user",
recipient="script_agent"
)
task_result.messages.append(retry_message)
continue
elif source == "instructor_agent" and isinstance(message, TextMessage) and "TERMINATE" in message.content:
logger.info("Instructor Agent completed lecture review: %s", message.content)
progress = 90
label = "Lecture materials ready. Generating lecture speech..."
file_paths = [f for f in os.listdir(OUTPUT_DIR) if f.endswith(('.md', '.txt'))]
file_paths.sort()
file_paths = [os.path.join(OUTPUT_DIR, f) for f in file_paths]
yield (
html_with_progress(label, progress),
file_paths
)
await asyncio.sleep(0.1)
logger.info("Slides state: %s", "Generated" if slides else "None")
logger.info("Scripts state: %s", "Generated" if scripts else "None")
logger.info("HTML files state: %s", "Generated" if html_files else "None")
if not slides or not scripts:
error_message = f"Failed to generate {'slides and scripts' if not slides and not scripts else 'slides' if not slides else 'scripts'}"
error_message += f". Received {len(slides) if slides else 0} slides and {len(scripts) if scripts else 0} scripts."
logger.error("%s", error_message)
logger.debug("Dumping all messages for debugging:")
for msg in task_result.messages:
source = getattr(msg, 'source', getattr(msg, 'sender', None))
logger.debug("Message from %s, type: %s, content: %s", source, type(msg), msg.to_text() if hasattr(msg, 'to_text') else str(msg))
yield (
error_html,
[]
)
return
if len(slides) != total_slides:
logger.error("Expected %d slides, but received %d", total_slides, len(slides))
yield (
f"""
Incorrect number of slides
Expected {total_slides} slides, but generated {len(slides)}. Please try again.
""",
[]
)
return
if not isinstance(scripts, list) or not all(isinstance(s, str) for s in scripts):
logger.error("Scripts are not a list of strings: %s", scripts)
yield (
f"""
Invalid script format
Scripts must be a list of strings. Please try again.
""",
[]
)
return
if len(scripts) != total_slides:
logger.error("Mismatch between number of slides (%d) and scripts (%d)", len(slides), len(scripts))
yield (
f"""
Mismatch in slides and scripts
Generated {len(slides)} slides but {len(scripts)} scripts. Please try again.
""",
[]
)
return
# Access the generated HTML files
html_file_urls = [get_gradio_file_url(html_file) for html_file in html_files]
audio_urls = [None] * len(scripts)
audio_timeline = ""
for i in range(len(scripts)):
audio_timeline += f''
file_paths = [f for f in os.listdir(OUTPUT_DIR) if f.endswith(('.md', '.txt'))]
file_paths.sort()
file_paths = [os.path.join(OUTPUT_DIR, f) for f in file_paths]
audio_files = []
validated_speaker_wav = await validate_and_convert_speaker_audio(speaker_audio)
if not validated_speaker_wav:
logger.error("Invalid speaker audio after conversion, skipping TTS")
yield (
f"""
Invalid speaker audio
Please upload a valid MP3 or WAV audio file and try again.
""",
[],
None
)
return
for i, script in enumerate(scripts):
cleaned_script = clean_script_text(script)
audio_file = os.path.join(OUTPUT_DIR, f"slide_{i+1}.mp3")
script_file = os.path.join(OUTPUT_DIR, f"slide_{i+1}_script.txt")
try:
with open(script_file, "w", encoding="utf-8") as f:
f.write(cleaned_script or "")
logger.info("Saved script to %s: %s", script_file, cleaned_script)
except Exception as e:
logger.error("Error saving script to %s: %s",
script_file, str(e))
if not cleaned_script:
logger.error("Skipping audio for slide %d due to empty or invalid script", i + 1)
audio_files.append(None)
audio_urls[i] = None
progress = 90 + ((i + 1) / len(scripts)) * 10
label = f"Generating lecture speech for slide {i + 1}/{len(scripts)}..."
yield (
html_with_progress(label, progress),
file_paths,
None
)
await asyncio.sleep(0.1)
continue
max_audio_retries = 2
for attempt in range(max_audio_retries + 1):
try:
current_text = cleaned_script
if attempt > 0:
sentences = re.split(r"[.!?]+", cleaned_script)
sentences = [s.strip() for s in sentences if s.strip()][:2]
current_text = ". ".join(sentences) + "."
logger.info("Retry %d for slide %d with simplified text: %s", attempt, i + 1, current_text)
success = generate_xtts_audio(tts, current_text, validated_speaker_wav, audio_file)
if not success:
raise RuntimeError("TTS generation failed")
logger.info("Generated audio for slide %d: %s", i + 1, audio_file)
audio_files.append(audio_file)
audio_urls[i] = get_gradio_file_url(audio_file)
progress = 90 + ((i + 1) / len(scripts)) * 10
label = f"Generating lecture speech for slide {i + 1}/{len(scripts)}..."
file_paths.append(audio_file)
yield (
html_with_progress(label, progress),
file_paths,
None
)
await asyncio.sleep(0.1)
break
except Exception as e:
logger.error("Error generating audio for slide %d (attempt %d): %s\n%s", i + 1, attempt, str(e), traceback.format_exc())
if attempt == max_audio_retries:
logger.error("Max retries reached for slide %d, skipping", i + 1)
audio_files.append(None)
audio_urls[i] = None
progress = 90 + ((i + 1) / len(scripts)) * 10
label = f"Generating lecture speech for slide {i + 1}/{len(scripts)}..."
yield (
html_with_progress(label, progress),
file_paths,
None
)
await asyncio.sleep(0.1)
break
# Create zip file with all materials except .html files
zip_file = create_zip_of_files(file_paths)
file_paths.append(zip_file)
# Slide hack: Render the lecture container with iframe containing HTML slides
audio_timeline = ""
for j, url in enumerate(audio_urls):
if url:
audio_timeline += f''
else:
audio_timeline += f''
slides_info = json.dumps({"htmlFiles": html_file_urls, "audioFiles": audio_urls})
html_output = f"""
Please Generate lecture content via the form on the left first before lecture begins
`;
}
}
});
}
}
// Initialize speaker selection
function initializeSpeakerSelect() {
const speakerSelect = document.getElementById('speaker-select');
const speakerAudio = document.querySelector('#speaker-audio input[type="file"]');
if (speakerSelect && speakerAudio) {
speakerSelect.addEventListener('change', (e) => {
const selectedSpeaker = e.target.value;
// Create a new File object from the selected speaker
fetch(selectedSpeaker)
.then(response => response.blob())
.then(blob => {
const file = new File([blob], selectedSpeaker, { type: 'audio/mpeg' });
const dataTransfer = new DataTransfer();
dataTransfer.items.add(file);
speakerAudio.files = dataTransfer.files;
const event = new Event('change', { bubbles: true });
speakerAudio.dispatchEvent(event);
});
});
}
}
// Initialize file upload when study mode is active
function checkAndInitializeUpload() {
const uploadArea = document.getElementById('upload-area');
if (uploadArea) {
console.log('Initializing file upload...');
initializeFileUpload();
}
initializeClearButton();
initializeSpeakerSelect();
}
// Check immediately and also set up an observer
checkAndInitializeUpload();
const modeObserver = new MutationObserver((mutations) => {
mutations.forEach((mutation) => {
if (mutation.addedNodes.length) {
checkAndInitializeUpload();
}
});
});
modeObserver.observe(document.body, { childList: true, subtree: true });
"""
# Handle mode switching
def switch_mode(mode):
if mode == "Learn Mode":
return default_slide_html, gr.update(visible=True), gr.update(visible=False)
else:
return study_mode_html, gr.update(visible=True), gr.update(visible=True)
mode_tabs.change(
fn=switch_mode,
inputs=[mode_tabs],
outputs=[slide_display, generate_btn, uploaded_file]
)
# Handle file upload in study mode
async def handle_file_upload(file, api_service, api_key):
"""Handle file upload in study mode and validate API key."""
if not file:
yield default_slide_html, None, None
return
# Validate API key or GITHUB_TOKEN for Azure AI Foundry
if not api_key and api_service != "Azure AI Foundry":
error_html = """
Please input api key first
An API key is required to process uploaded files in Study mode. Please provide a valid API key and try again.
"""
logger.warning("API key is empty, terminating file upload")
yield error_html, None, None
return
elif api_service == "Azure AI Foundry" and not os.environ.get("GITHUB_TOKEN"):
error_html = """
GITHUB_TOKEN not set
Azure AI Foundry requires a GITHUB_TOKEN environment variable. Please set it and try again.
"""
logger.warning("GITHUB_TOKEN is missing for Azure AI Foundry, terminating file upload")
yield error_html, None, None
return
try:
# Show uploading progress
yield html_with_progress("Uploading Lecture Material...", 25), None, None
await asyncio.sleep(0.1)
# Show processing progress
yield html_with_progress("Processing file...", 50), None, None
await asyncio.sleep(0.1)
# Process file and generate inputs
yield html_with_progress("Researching lecture material...", 75), None, None
await asyncio.sleep(0.1)
result = await study_mode_process(file, api_service, api_key)
# Show success message with updated inputs
success_html = """
Research on study material completed, you can now generate lecture
The form has been updated with the extracted information. Click Generate Lecture to proceed.
"""
# Prompt via chat updates only title and description form inputs
yield (
success_html,
result["title"],
result["content_description"]
)
except Exception as e:
error_html = f"""
Error processing file
{str(e)}
"""
logger.error(f"Error processing file: {str(e)}")
yield error_html, None, None
uploaded_file.change(
fn=handle_file_upload,
inputs=[uploaded_file, api_service, api_key],
outputs=[slide_display, title, lecture_content_description]
)
speaker_audio.change(
fn=update_audio_preview,
inputs=speaker_audio,
outputs=speaker_audio
)
generate_btn.click(
fn=on_generate,
inputs=[api_service, api_key, serpapi_key, title, lecture_content_description, lecture_type, lecture_style, speaker_audio, num_slides],
outputs=[slide_display, file_output]
)
# Handle speaker selection
def update_speaker_audio(speaker):
logger.info(f"Speaker selection changed to: {speaker}")
return speaker
speaker_select.change(
fn=update_speaker_audio,
inputs=[speaker_select],
outputs=[speaker_audio]
)
js_code = js_code + """
// Add note editor functionality
function initializeNoteEditor() {
const addNoteBtn = document.getElementById('add-note-btn');
const backBtn = document.getElementById('back-btn');
const notesView = document.getElementById('notes-view');
const noteEditor = document.getElementById('note-editor');
if (addNoteBtn && backBtn && notesView && noteEditor) {
addNoteBtn.addEventListener('click', () => {
notesView.style.display = 'none';
noteEditor.style.display = 'block';
});
backBtn.addEventListener('click', () => {
noteEditor.style.display = 'none';
notesView.style.display = 'block';
});
}
}
// Initialize all components
function initializeComponents() {
initializeFileUpload();
initializeClearButton();
initializeSpeakerSelect();
initializeNoteEditor();
}
initializeComponents();
const observer = new MutationObserver((mutations) => {
mutations.forEach((mutation) => {
if (mutation.addedNodes.length) {
initializeComponents();
}
});
});
observer.observe(document.body, { childList: true, subtree: true });
"""
async def run_note_agent(api_service, api_key, lecture_context, note_title, note_content):
model_client = get_model_client(api_service, api_key)
system_message = (
"You are a Note Agent. Given the current lecture slides and scripts, help the user draft a note. "
"If a title or content is provided, improve or complete the note. If not, suggest a new note based on the lecture. "
"Always use the lecture context. Output a JSON object: {\"title\": ..., \"content\": ...}."
)
note_agent = AssistantAgent(
name="note_agent",
model_client=model_client,
system_message=system_message
)
context_str = json.dumps(lecture_context)
user_input = f"Lecture Context: {context_str}\nNote Title: {note_title}\nNote Content: {note_content}"
result = await Console(note_agent.run_stream(task=user_input))
# Return only the agent's reply
for msg in reversed(result.messages):
if getattr(msg, 'source', None) == 'note_agent' and hasattr(msg, 'content') and isinstance(msg.content, str):
try:
extracted = extract_json_from_message(msg)
if extracted and isinstance(extracted, dict):
return extracted
except Exception:
continue
for msg in reversed(result.messages):
if hasattr(msg, 'content') and isinstance(msg.content, str):
try:
extracted = extract_json_from_message(msg)
if extracted and isinstance(extracted, dict):
return extracted
except Exception:
continue
return {"title": note_title, "content": note_content}
async def run_study_agent(api_service, api_key, lecture_context):
model_client = get_model_client(api_service, api_key)
system_message = (
"You are a Study Guide Agent. Given the current lecture slides and scripts, generate a concise study guide (max 200 words) summarizing the key points and actionable steps for the student. Output plain text only."
)
study_agent = AssistantAgent(
name="study_agent",
model_client=model_client,
system_message=system_message
)
context_str = json.dumps(lecture_context)
user_input = f"Lecture Context: {context_str}"
result = await Console(study_agent.run_stream(task=user_input))
# Return only the agent's reply
for msg in reversed(result.messages):
if getattr(msg, 'source', None) == 'study_agent' and hasattr(msg, 'content') and isinstance(msg.content, str):
return msg.content.strip()
for msg in reversed(result.messages):
if hasattr(msg, 'content') and isinstance(msg.content, str):
return msg.content.strip()
return "No study guide generated."
async def run_quiz_agent(api_service, api_key, lecture_context):
model_client = get_model_client(api_service, api_key)
system_message = (
"You are a Quiz Agent. Given the current lecture slides and scripts, generate a short quiz (3-5 questions) to test understanding. Output plain text only."
)
quiz_agent = AssistantAgent(
name="quiz_agent",
model_client=model_client,
system_message=system_message
)
context_str = json.dumps(lecture_context)
user_input = f"Lecture Context: {context_str}"
result = await Console(quiz_agent.run_stream(task=user_input))
# Return only the agent's reply
for msg in reversed(result.messages):
if getattr(msg, 'source', None) == 'quiz_agent' and hasattr(msg, 'content') and isinstance(msg.content, str):
return msg.content.strip()
for msg in reversed(result.messages):
if hasattr(msg, 'content') and isinstance(msg.content, str):
return msg.content.strip()
return "No quiz generated."
async def run_chat_agent(api_service, api_key, lecture_context, chat_history, user_message):
model_client = get_model_client(api_service, api_key)
system_message = (
"You are a helpful Chat Agent. Answer questions about the lecture, and if the user asks for a lecture title or content description, suggest appropriate values. "
"If you want to update the form, output a JSON object: {\"title\": ..., \"content_description\": ...}. Otherwise, just reply as normal."
)
chat_agent = AssistantAgent(
name="chat_agent",
model_client=model_client,
system_message=system_message
)
context_str = json.dumps(lecture_context)
chat_str = "\n".join([f"User: {m['content']}" if m['role']=='user' else f"Assistant: {m['content']}" for m in chat_history])
user_input = f"Lecture Context: {context_str}\nChat History: {chat_str}\nUser: {user_message}"
result = await Console(chat_agent.run_stream(task=user_input))
# Return only the chat_agent's reply
for msg in reversed(result.messages):
if getattr(msg, 'source', None) == 'chat_agent' and hasattr(msg, 'content') and isinstance(msg.content, str):
extracted = extract_json_from_message(msg)
if extracted and isinstance(extracted, dict):
return extracted, None
return None, msg.content.strip()
for msg in reversed(result.messages):
if hasattr(msg, 'content') and isinstance(msg.content, str):
extracted = extract_json_from_message(msg)
if extracted and isinstance(extracted, dict):
return extracted, None
return None, msg.content.strip()
return None, "No response."
def update_notes_list(notes):
"""Convert notes list to DataFrame format for Gradio Dataframe (titles only)."""
return [[n["title"]] for n in notes]
def show_note_editor_with_content(title, content):
return (
gr.update(visible=True), # note_editor
gr.update(visible=False), # notes_list
gr.update(visible=False), # study_guide_output
gr.update(visible=False), # quiz_output
gr.update(value=title), # note_title
gr.update(value=content) # note_content
)
def hide_note_editor():
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
def show_study_guide(guide):
return gr.update(visible=False), gr.update(visible=True), gr.update(value=guide, visible=True), gr.update(visible=False)
def show_quiz(quiz):
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(value=quiz, visible=True)
# Helper to get fallback lecture context from form fields
def get_fallback_lecture_context(lecture_context, title_val, desc_val, style_val, audience_val):
# If slides/scripts missing, use form fields
if lecture_context and (lecture_context.get("slides") or lecture_context.get("scripts")):
return lecture_context
return {
"slides": [],
"scripts": [],
"title": title_val or "Untitled Lecture",
"description": desc_val or "No description provided.",
"style": style_val or "Feynman - Simplifies complex ideas with enthusiasm",
"audience": audience_val or "University"
}
def show_note_content(evt: dict, notes):
# evt['index'] gives the row index
idx = evt.get('index', 0)
if 0 <= idx < len(notes):
note = notes[idx]
note_file = os.path.join(OUTPUT_DIR, f"{note['title']}.txt")
if os.path.exists(note_file):
with open(note_file, "r", encoding="utf-8") as f:
note_text = f.read()
return gr.update(value=note_text)
return gr.update(value="Click any button above to generate content...")
notes_list.select(
fn=show_note_content,
inputs=[notes_state],
outputs=note_response
)
# --- NOTES LOGIC ---
def note_type_prefix(note_type, title):
if note_type and not title.startswith(note_type):
return f"{note_type} - {title}"
return title
custom_css = """
#right-column {height: 100% !important; display: flex !important; flex-direction: column !important; gap: 20px !important;}
#notes-section, #chat-section {flex: 1 1 0; min-height: 0; max-height: 50vh; overflow-y: auto;}
#chat-section {display: flex; flex-direction: column; position: relative;}
#chatbot {flex: 1 1 auto; min-height: 0; max-height: calc(50vh - 60px); overflow-y: auto;}
#chat-input-row {position: sticky; bottom: 0; background: white; z-index: 2; padding-top: 8px;}
"""
demo.css += custom_css
if __name__ == "__main__":
demo.launch(allowed_paths=[OUTPUT_DIR])