Spaces:
Running
Running
| import gradio as gr | |
| from PIL import Image | |
| from moviepy.editor import VideoFileClip, AudioFileClip | |
| import os | |
| from openai import OpenAI | |
| import subprocess | |
| from pathlib import Path | |
| import uuid | |
| import tempfile | |
| import shlex | |
| import shutil | |
| # Supported models configuration | |
| MODELS = { | |
| "deepseek-ai/DeepSeek-V3": { | |
| "base_url": "https://api.deepseek.com/v1", | |
| "env_key": "DEEPSEEK_API_KEY", | |
| }, | |
| "Qwen/Qwen2.5-Coder-32B-Instruct": { | |
| "base_url": "https://api-inference.huggingface.co/v1/", | |
| "env_key": "HF_TOKEN", | |
| }, | |
| } | |
| # Initialize client with first available model | |
| client = OpenAI( | |
| base_url=next(iter(MODELS.values()))["base_url"], | |
| api_key=os.environ[next(iter(MODELS.values()))["env_key"]], | |
| ) | |
| allowed_medias = [ | |
| ".png", | |
| ".jpg", | |
| ".webp", | |
| ".jpeg", | |
| ".tiff", | |
| ".bmp", | |
| ".gif", | |
| ".svg", | |
| ".mp3", | |
| ".wav", | |
| ".ogg", | |
| ".mp4", | |
| ".avi", | |
| ".mov", | |
| ".mkv", | |
| ".flv", | |
| ".wmv", | |
| ".webm", | |
| ".mpg", | |
| ".mpeg", | |
| ".m4v", | |
| ".3gp", | |
| ".3g2", | |
| ".3gpp", | |
| ] | |
| def get_files_infos(files): | |
| results = [] | |
| for file in files: | |
| file_path = Path(file.name) | |
| info = {} | |
| info["size"] = os.path.getsize(file_path) | |
| # Sanitize filename by replacing spaces with underscores | |
| info["name"] = file_path.name.replace(" ", "_") | |
| file_extension = file_path.suffix | |
| if file_extension in (".mp4", ".avi", ".mkv", ".mov"): | |
| info["type"] = "video" | |
| video = VideoFileClip(file.name) | |
| info["duration"] = video.duration | |
| info["dimensions"] = "{}x{}".format(video.size[0], video.size[1]) | |
| if video.audio: | |
| info["type"] = "video/audio" | |
| info["audio_channels"] = video.audio.nchannels | |
| video.close() | |
| elif file_extension in (".mp3", ".wav"): | |
| info["type"] = "audio" | |
| audio = AudioFileClip(file.name) | |
| info["duration"] = audio.duration | |
| info["audio_channels"] = audio.nchannels | |
| audio.close() | |
| elif file_extension in ( | |
| ".png", | |
| ".jpg", | |
| ".jpeg", | |
| ".tiff", | |
| ".bmp", | |
| ".gif", | |
| ".svg", | |
| ): | |
| info["type"] = "image" | |
| img = Image.open(file.name) | |
| info["dimensions"] = "{}x{}".format(img.size[0], img.size[1]) | |
| results.append(info) | |
| return results | |
| def get_completion(prompt, files_info, top_p, temperature, model_choice): | |
| # Create table header | |
| files_info_string = "| Type | Name | Dimensions | Duration | Audio Channels |\n" | |
| files_info_string += "|------|------|------------|-----------|--------|\n" | |
| # Add each file as a table row | |
| for file_info in files_info: | |
| dimensions = file_info.get("dimensions", "-") | |
| duration = ( | |
| f"{file_info.get('duration', '-')}s" if "duration" in file_info else "-" | |
| ) | |
| audio = ( | |
| f"{file_info.get('audio_channels', '-')} channels" | |
| if "audio_channels" in file_info | |
| else "-" | |
| ) | |
| files_info_string += f"| {file_info['type']} | {file_info['name']} | {dimensions} | {duration} | {audio} |\n" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": """ | |
| You are a very experienced media engineer, controlling a UNIX terminal. | |
| You are an FFMPEG expert with years of experience and multiple contributions to the FFMPEG project. | |
| You are given: | |
| (1) a set of video, audio and/or image assets. Including their name, duration, dimensions and file size | |
| (2) the description of a new video you need to create from the list of assets | |
| Your objective is to generate the SIMPLEST POSSIBLE single ffmpeg command to create the requested video. | |
| Key requirements: | |
| - Use the absolute minimum number of ffmpeg options needed | |
| - Avoid complex filter chains or filter_complex if possible | |
| - Prefer simple concatenation, scaling, and basic filters | |
| - Output exactly ONE command that will be directly pasted into the terminal | |
| - Never output multiple commands chained together | |
| - Output the command in a single line (no line breaks or multiple lines) | |
| - If the user asks for waveform visualization make sure to set the mode to `line` with and the use the full width of the video. Also concatenate the audio into a single channel. | |
| - For image sequences: Use -framerate and pattern matching (like 'img%d.jpg') when possible, falling back to individual image processing with -loop 1 and appropriate filters only when necessary. | |
| - When showing file operations or commands, always use explicit paths and filenames without wildcards - avoid using asterisk (*) or glob patterns. Instead, use specific numbered sequences (like %d), explicit file lists, or show the full filename. | |
| Remember: Simpler is better. Only use advanced ffmpeg features if absolutely necessary for the requested output. | |
| """, | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"""Always output the media as video/mp4 and output file with "output.mp4". Provide only the shell command without any explanations. | |
| The current assets and objective follow. Reply with the FFMPEG command: | |
| AVAILABLE ASSETS LIST: | |
| {files_info_string} | |
| OBJECTIVE: {prompt} and output at "output.mp4" | |
| YOUR FFMPEG COMMAND: | |
| """, | |
| }, | |
| ] | |
| try: | |
| # Print the complete prompt | |
| print("\n=== COMPLETE PROMPT ===") | |
| for msg in messages: | |
| print(f"\n[{msg['role'].upper()}]:") | |
| print(msg["content"]) | |
| print("=====================\n") | |
| if model_choice not in MODELS: | |
| raise ValueError(f"Model {model_choice} is not supported") | |
| model_config = MODELS[model_choice] | |
| client.base_url = model_config["base_url"] | |
| client.api_key = os.environ[model_config["env_key"]] | |
| model = "deepseek-chat" if "deepseek" in model_choice.lower() else model_choice | |
| completion = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| top_p=top_p, | |
| max_tokens=2048, | |
| ) | |
| content = completion.choices[0].message.content | |
| # Extract command from code block if present | |
| if "```" in content: | |
| # Find content between ```sh or ```bash and the next ``` | |
| import re | |
| command = re.search(r"```(?:sh|bash)?\n(.*?)\n```", content, re.DOTALL) | |
| if command: | |
| command = command.group(1).strip() | |
| else: | |
| command = content.replace("\n", "") | |
| else: | |
| command = content.replace("\n", "") | |
| # remove output.mp4 with the actual output file path | |
| command = command.replace("output.mp4", "") | |
| return command | |
| except Exception as e: | |
| raise Exception("API Error") | |
| def update( | |
| files, | |
| prompt, | |
| top_p=1, | |
| temperature=1, | |
| model_choice="Qwen/Qwen2.5-Coder-32B-Instruct", | |
| ): | |
| if prompt == "": | |
| raise gr.Error("Please enter a prompt.") | |
| files_info = get_files_infos(files) | |
| # disable this if you're running the app locally or on your own server | |
| for file_info in files_info: | |
| if file_info["type"] == "video": | |
| if file_info["duration"] > 120: | |
| raise gr.Error( | |
| "Please make sure all videos are less than 2 minute long." | |
| ) | |
| if file_info["size"] > 100000000: | |
| raise gr.Error("Please make sure all files are less than 100MB in size.") | |
| attempts = 0 | |
| while attempts < 2: | |
| print("ATTEMPT", attempts) | |
| try: | |
| command_string = get_completion( | |
| prompt, files_info, top_p, temperature, model_choice | |
| ) | |
| print( | |
| f"""///PROMTP {prompt} \n\n/// START OF COMMAND ///:\n\n{command_string}\n\n/// END OF COMMAND ///\n\n""" | |
| ) | |
| # split command string into list of arguments | |
| args = shlex.split(command_string) | |
| if args[0] != "ffmpeg": | |
| raise Exception("Command does not start with ffmpeg") | |
| temp_dir = tempfile.mkdtemp() | |
| # copy files to temp dir with sanitized names | |
| for file in files: | |
| file_path = Path(file.name) | |
| sanitized_name = file_path.name.replace(" ", "_") | |
| shutil.copy(file_path, Path(temp_dir) / sanitized_name) | |
| # test if ffmpeg command is valid dry run | |
| ffmpg_dry_run = subprocess.run( | |
| args + ["-f", "null", "-"], | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| cwd=temp_dir, | |
| ) | |
| if ffmpg_dry_run.returncode == 0: | |
| print("Command is valid.") | |
| else: | |
| print("Command is not valid. Error output:") | |
| print(ffmpg_dry_run.stderr) | |
| raise Exception( | |
| "FFMPEG generated command is not valid. Please try something else." | |
| ) | |
| output_file_name = f"output_{uuid.uuid4()}.mp4" | |
| output_file_path = str((Path(temp_dir) / output_file_name).resolve()) | |
| final_command = args + ["-y", output_file_path] | |
| print( | |
| f"\n=== EXECUTING FFMPEG COMMAND ===\nffmpeg {' '.join(final_command[1:])}\n" | |
| ) | |
| subprocess.run(final_command, cwd=temp_dir) | |
| generated_command = f"### Generated Command\n```bash\nffmpeg {' '.join(args[1:])} -y output.mp4\n```" | |
| return output_file_path, gr.update(value=generated_command) | |
| except Exception as e: | |
| attempts += 1 | |
| if attempts >= 2: | |
| print("FROM UPDATE", e) | |
| raise gr.Error(e) | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| """ | |
| # 🏞 AI Video Composer | |
| Compose new videos from your assets using natural language. Add video, image and audio assets and let [Qwen2.5-Coder](https://huggingface.co/Qwen/Qwen2.5-Coder-32B-Instruct) or [DeepSeek-V3](https://huggingface.co/deepseek-ai/DeepSeek-V3-Base) generate a new video for you (using FFMPEG). | |
| """, | |
| elem_id="header", | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| user_files = gr.File( | |
| file_count="multiple", | |
| label="Media files", | |
| file_types=allowed_medias, | |
| ) | |
| user_prompt = gr.Textbox( | |
| placeholder="eg: Remove the 3 first seconds of the video", | |
| label="Instructions", | |
| ) | |
| btn = gr.Button("Run") | |
| with gr.Accordion("Parameters", open=False): | |
| model_choice = gr.Radio( | |
| choices=list(MODELS.keys()), | |
| value=list(MODELS.keys())[0], | |
| label="Model", | |
| ) | |
| top_p = gr.Slider( | |
| minimum=-0, | |
| maximum=1.0, | |
| value=0.7, | |
| step=0.05, | |
| interactive=True, | |
| label="Top-p (nucleus sampling)", | |
| ) | |
| temperature = gr.Slider( | |
| minimum=-0, | |
| maximum=5.0, | |
| value=0.1, | |
| step=0.1, | |
| interactive=True, | |
| label="Temperature", | |
| ) | |
| with gr.Column(): | |
| generated_video = gr.Video( | |
| interactive=False, label="Generated Video", include_audio=True | |
| ) | |
| generated_command = gr.Markdown() | |
| btn.click( | |
| fn=update, | |
| inputs=[user_files, user_prompt, top_p, temperature, model_choice], | |
| outputs=[generated_video, generated_command], | |
| ) | |
| with gr.Row(): | |
| gr.Examples( | |
| examples=[ | |
| [ | |
| ["./examples/ai_talk.wav", "./examples/bg-image.png"], | |
| "Use the image as the background with a waveform visualization for the audio positioned in center of the video.", | |
| 0.7, | |
| 0.1, | |
| ( | |
| list(MODELS.keys())[1] | |
| if len(MODELS) > 1 | |
| else list(MODELS.keys())[0] | |
| ), | |
| ], | |
| [ | |
| ["./examples/ai_talk.wav", "./examples/bg-image.png"], | |
| "Use the image as the background with a waveform visualization for the audio positioned in center of the video. Make sure the waveform has a max height of 250 pixels.", | |
| 0.7, | |
| 0.1, | |
| list(MODELS.keys())[0], | |
| ], | |
| [ | |
| [ | |
| "./examples/cat1.jpeg", | |
| "./examples/cat2.jpeg", | |
| "./examples/cat3.jpeg", | |
| "./examples/cat4.jpeg", | |
| "./examples/cat5.jpeg", | |
| "./examples/cat6.jpeg", | |
| "./examples/heat-wave.mp3", | |
| ], | |
| "Create a 3x2 grid of the cat images with the audio as background music. Make the video duration match the audio duration.", | |
| 0.7, | |
| 0.1, | |
| ( | |
| list(MODELS.keys())[1] | |
| if len(MODELS) > 1 | |
| else list(MODELS.keys())[0] | |
| ), | |
| ], | |
| ], | |
| inputs=[user_files, user_prompt, top_p, temperature, model_choice], | |
| outputs=[generated_video, generated_command], | |
| fn=update, | |
| run_on_click=True, | |
| cache_examples=False, | |
| ) | |
| with gr.Row(): | |
| gr.Markdown( | |
| """ | |
| If you have idea to improve this please open a PR: | |
| [](https://huggingface.co/spaces/huggingface-projects/video-composer-gpt4/discussions) | |
| """, | |
| ) | |
| demo.queue(default_concurrency_limit=200) | |
| demo.launch(show_api=False, ssr_mode=False) | |