|
|
|
import streamlit as st |
|
from gradio_client import Client |
|
from PIL import Image |
|
import moviepy.editor as mp |
|
from natsort import natsorted |
|
from pydantic import BaseModel, Field |
|
from typing import List, Dict, Type, Optional, TypedDict |
|
from langgraph.graph import StateGraph, START, END |
|
from langchain_groq import ChatGroq |
|
from langchain_core.messages import SystemMessage |
|
import os |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
|
IMAGE_GENERATION_SPACE_NAME = "habib926653/stabilityai-stable-diffusion-3.5-large-turbo" |
|
SUPPORTED_FORMATS = ["mp3", "wav", "ogg", "flac", "aac", "m4a"] |
|
|
|
|
|
class SingleScene(BaseModel): |
|
text: str = Field(description="Actual Segment of text(a scene) from the complete story") |
|
image_prompts: List[str] = Field( |
|
description="""List of detailed and descriptive image prompts for the segment |
|
prompt format: [theme: {atmosphere/mood}] [style: {artistic/photorealistic}] [focus: {main subject}] [details: {specific elements}] [lighting: {day/night/mystic}] [perspective: {close-up/wide-angle}]" |
|
Example: "theme: eerie forest | style: cinematic realism | focus: abandoned cabin | details: broken windows, overgrown vines | lighting: moonlit fog | perspective: wide-angle shot" |
|
""" |
|
) |
|
|
|
class ScenesResponseSchema(BaseModel): |
|
scenes: List[SingleScene] |
|
|
|
|
|
class State(TypedDict): |
|
messages: list |
|
output: Optional[BaseModel] |
|
|
|
class StructuredOutputExtractor: |
|
def __init__(self, response_schema: Type[BaseModel]): |
|
self.response_schema = response_schema |
|
self.llm = ChatGroq(model="deepseek-r1-distill-llama-70b", api_key=GROQ_API_KEY) |
|
self.structured_llm = self.llm.with_structured_output(response_schema) |
|
self._build_graph() |
|
|
|
def _build_graph(self): |
|
graph_builder = StateGraph(State) |
|
graph_builder.add_node("extract", self._extract_structured_info) |
|
graph_builder.add_edge(START, "extract") |
|
graph_builder.add_edge("extract", END) |
|
self.graph = graph_builder.compile() |
|
|
|
def _extract_structured_info(self, state: dict): |
|
query = state['messages'][-1].content |
|
try: |
|
output = self.structured_llm.invoke(query) |
|
return {"output": output} |
|
except Exception as e: |
|
st.error(f"Error during extraction: {e}") |
|
return {"output": None} |
|
|
|
def extract(self, query: str) -> Optional[BaseModel]: |
|
result = self.graph.invoke({"messages": [SystemMessage(content=query)]}) |
|
return result.get('output') |
|
|
|
|
|
def calculate_read_time(text: str, words_per_minute: int = 155) -> str: |
|
try: |
|
if not text or not isinstance(text, str): |
|
return "Invalid input: Text must be a non-empty string." |
|
words = text.split() |
|
word_count = len(words) |
|
total_seconds = (word_count / words_per_minute) * 60 |
|
hours = int(total_seconds // 3600) |
|
minutes = int((total_seconds % 3600) // 60) |
|
seconds = int(total_seconds % 60) |
|
if hours > 0: |
|
return f"Reading time: {hours} hour(s), {minutes} minute(s), and {seconds} second(s)." |
|
elif minutes > 0: |
|
return f"Reading time: {minutes} minute(s) and {seconds} second(s)." |
|
else: |
|
return f"Reading time: {seconds} second(s)." |
|
except Exception as e: |
|
return f"An error occurred: {e}" |
|
|
|
def get_scenes(text_script: str): |
|
read_time = calculate_read_time(text_script) |
|
prompt = f""" |
|
ROLE: Story to Scene Generator |
|
Tasks: For the given story |
|
1. Read it Completely and Understand the Complete Context |
|
2. Rewrite the story in tiny scenes(but without even changing a word) with highly detailed and context aware list of image prompts to visualize each scene |
|
3. Never Describe complete scene in a single image prompt use multiple prompts |
|
RULE OF THUMB: 12 image prompts / 1 min audio |
|
|
|
Estimated Read Time: {read_time}\n\n |
|
Complete Story: {text_script} |
|
""" |
|
extractor = StructuredOutputExtractor(response_schema=ScenesResponseSchema) |
|
result = extractor.extract(prompt) |
|
return result.model_dump() if result else {} |
|
|
|
def generate_audio(text, language_code, speaker, path='test_audio.mp3'): |
|
try: |
|
client = Client("habib926653/Multilingual-TTS") |
|
result = client.predict( |
|
text=text, |
|
language_code=language_code, |
|
speaker=speaker, |
|
api_name="/text_to_speech_edge" |
|
) |
|
audio_file_path = result[1] |
|
with open(audio_file_path, 'rb') as f: |
|
audio_bytes = f.read() |
|
with open(path, 'wb') as f: |
|
f.write(audio_bytes) |
|
return {"audio_file": path} |
|
except Exception as e: |
|
st.error(f"Error during audio generation: {e}") |
|
return {"error": str(e)} |
|
|
|
def generate_image(prompt, path='test_image.png'): |
|
try: |
|
client = Client(IMAGE_GENERATION_SPACE_NAME, hf_token=HF_TOKEN) |
|
result = client.predict( |
|
prompt=prompt, |
|
width=1280, |
|
height=720, |
|
api_name="/generate_image" |
|
) |
|
image = Image.open(result) |
|
image.save(path) |
|
return result |
|
except Exception as e: |
|
st.error(f"Error during image generation: {e}") |
|
return {"error": str(e)} |
|
|
|
def generate_video_assets(scenes: Dict, language: str, speaker: str, base_path: str = "media") -> str: |
|
try: |
|
if not os.path.exists(base_path): |
|
os.makedirs(base_path) |
|
scenes_list = scenes.get("scenes", []) |
|
video_folder = os.path.join(base_path, f"video_{len(os.listdir(base_path)) + 1}") |
|
os.makedirs(video_folder, exist_ok=True) |
|
images_folder = os.path.join(video_folder, "images") |
|
audio_folder = os.path.join(video_folder, "audio") |
|
os.makedirs(images_folder, exist_ok=True) |
|
os.makedirs(audio_folder, exist_ok=True) |
|
|
|
for scene_count, scene in enumerate(scenes_list): |
|
text = scene.get("text", "") |
|
image_prompts = scene.get("image_prompts", []) |
|
audio_path = os.path.join(audio_folder, f"scene_{scene_count + 1}.mp3") |
|
audio_result = generate_audio(text, language, speaker, path=audio_path) |
|
if "error" in audio_result: |
|
continue |
|
scene_images_folder = os.path.join(images_folder, f"scene_{scene_count + 1}") |
|
os.makedirs(scene_images_folder, exist_ok=True) |
|
for count, prompt in enumerate(image_prompts): |
|
image_path = os.path.join(scene_images_folder, f"scene_{scene_count + 1}_image_{count + 1}.png") |
|
generate_image(prompt=prompt, path=image_path) |
|
|
|
return video_folder |
|
except Exception as e: |
|
st.error(f"Error during video asset generation: {e}") |
|
return "" |
|
|
|
def generate_video(video_folder: str, output_filename: str = "final_video.mp4"): |
|
try: |
|
audio_folder = os.path.join(video_folder, "audio") |
|
images_folder = os.path.join(video_folder, "images") |
|
final_clips = [] |
|
scene_folders = [ |
|
os.path.join(images_folder, scene) |
|
for scene in natsorted(os.listdir(images_folder)) |
|
if os.path.isdir(os.path.join(images_folder, scene)) |
|
] |
|
for scene_path in scene_folders: |
|
scene_name = os.path.basename(scene_path) |
|
audio_path = os.path.join(audio_folder, f"{scene_name}.mp3") |
|
if not os.path.exists(audio_path): |
|
continue |
|
image_files = natsorted([ |
|
os.path.join(scene_path, img) |
|
for img in os.listdir(scene_path) |
|
if img.lower().endswith(('.png', '.jpg', '.jpeg')) |
|
]) |
|
if not image_files: |
|
continue |
|
audio_clip = mp.AudioFileClip(audio_path) |
|
duration_per_image = audio_clip.duration / len(image_files) |
|
image_clips = [mp.ImageClip(img).set_duration(duration_per_image) for img in image_files] |
|
scene_video = mp.concatenate_videoclips(image_clips, method="compose").set_audio(audio_clip) |
|
final_clips.append(scene_video) |
|
if not final_clips: |
|
st.error("No valid scenes processed.") |
|
return None |
|
final_video = mp.concatenate_videoclips(final_clips, method="compose") |
|
output_path = os.path.join(video_folder, output_filename) |
|
final_video.write_videofile(output_path, fps=24, codec='libx264') |
|
return output_path |
|
except Exception as e: |
|
st.error(f"Error during video generation: {e}") |
|
return None |
|
|
|
|
|
def main(): |
|
st.markdown("<h1 style='text-align: center;'>Text to Video Generator</h1>", unsafe_allow_html=True) |
|
st.markdown("<p style='text-align: center;'>Leave a Like if it works for you! ❤️</p>", unsafe_allow_html=True) |
|
|
|
text_script = st.text_area("Enter your script/story (max 1500 characters):", max_chars=1500) |
|
language = st.selectbox("Choose Language:", ["Urdu", "English"]) |
|
client = Client("habib926653/Multilingual-TTS") |
|
speakers_response = client.predict(language=language, api_name="/get_speakers") |
|
speakers = [choice[0] for choice in speakers_response["choices"]] |
|
selected_speaker = st.selectbox("Choose Speaker:", speakers) |
|
|
|
if st.button("Generate Video"): |
|
if text_script: |
|
with st.spinner("Generating video... This may take a few minutes."): |
|
scenes = get_scenes(text_script) |
|
if not scenes: |
|
st.error("Failed to generate scenes.") |
|
else: |
|
video_assets_folder = generate_video_assets(scenes, language, selected_speaker) |
|
if video_assets_folder: |
|
generated_video_path = generate_video(video_assets_folder) |
|
if generated_video_path: |
|
st.success("Video generated successfully!") |
|
st.video(generated_video_path) |
|
else: |
|
st.warning("Please enter some text to generate a video.") |
|
|
|
st.markdown("### 🔥 See How It Works (Example)") |
|
example_script = """ |
|
One hot summer day, a thirsty crow was flying in search of water. He looked everywhere, but he couldn't find a single drop. Tired and exhausted, he finally spotted a clay pot with a little water at the bottom. |
|
""" |
|
st.markdown(f"**Example Script:** {example_script}") |
|
|
|
if __name__ == "__main__": |
|
main() |