File size: 11,846 Bytes
a81e750
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# -*- coding: utf-8 -*-
import gradio as gr
from typing import Optional, List

from pydantic import BaseModel, Field

from phi.agent import Agent
from phi.model.google import Gemini
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger

from agent_tools import (
    object_detection_embed,
    summarize_video,
    )
from utils import (
    create_poster,
    download_youtube_video,
    generate_tmp_filename,
    pdf_to_jpg,
    )

import os
from PIL import Image
import numpy as np

# Output language
LANG_OPTIONS = [
    "Original",
    "Chinese",
    "English",
] 

#====================================================================================
class Video(BaseModel):
    name: str = Field(..., description="File name of the video.")
    url: str = Field(..., description="Link to the video.")
    summary: Optional[str] = Field(..., description="Summary of the video.")
    hash_value: Optional[str] = Field(..., description="sha256_hash value of the video.")

class VideoCache:
    def __init__(self):
        self.session_state = gr.State({
            'metadata': {},  # For summaries
            'frame_data': {}  # For image arrays (serialized)
        })
    
    def add_to_cache(self, basename: str, out_lang: str, summary: str, frames: list[np.ndarray]):
        """Store both summary and frames properly"""
        key = basename + '_' + out_lang
        # Convert numpy arrays to bytes
        serialized_frames = [self._array_to_bytes(arr) for arr in frames]
        
        # Update cache
        current_state = self.session_state.value
        current_state['metadata'][key] = summary
        current_state['frame_data'][key] = serialized_frames
        self.session_state.value = current_state
    
    def get_from_cache(self, basename: str, out_lang: str) -> tuple:
        """Retrieve both summary and frames"""
        key = basename + '_' + out_lang
        cache = self.session_state.value
        summary = cache['metadata'].get(key)
        frame_bytes = cache['frame_data'].get(key, [])
        
        # Convert bytes back to arrays
        frames = [self._bytes_to_array(*b) for b in frame_bytes]
        return summary, frames
    
    @staticmethod
    def _array_to_bytes(arr: np.ndarray) -> tuple:
        """Convert array to (bytes, shape)"""
        return arr.tobytes(), arr.shape
    
    @staticmethod
    def _bytes_to_array(b: bytes, shape: tuple) -> np.ndarray:
        """Reconstruct array from (bytes, shape)"""
        return np.frombuffer(b, dtype=np.uint8).reshape(shape)

class VideoPosterGenerator(Workflow):
    # Define an Agent that will load video clip
    loader: Agent = Agent(
        tools=[download_youtube_video],
        show_tool_calls=True,
        description="Given a url_link, load video to process.",
    )
    
    # Define an Agent that will summarize video
    summarizer: Agent = Agent(
        tools=[summarize_video],
        show_tool_calls=True,
        markdown=True,
        description="Given a video, answer the prompt questions.",
    )
    
    # Define an Agent that will extract top three object images
    detector: Agent = Agent(
        tools=[object_detection_embed],
        show_tool_calls=True,
        structured_outputs=True,
        description="Given a video, extract top three object images.",
    )
    
    # Define an Agent that will generate a poster
    poster: Agent = Agent(
        tools=[create_poster],    
        show_tool_calls=True,
        structured_outputs=True,
        description="Given summary and images, generate one page postes.",
    )
    
    def run(self, url: str, user_prompt: str, out_lang: str, use_cache: bool = True) -> RunResponse:
        """This is where the main logic of the workflow is implemented."""

        logger.info(f"Generating a poster for video: {url}")
        basename = os.path.basename(url)
        pdf_name = generate_tmp_filename(basename, ".pdf")
        
        # Step 1: Use the cached video poster if use_cache is True
        if use_cache:
            summary, objects = video_cache.get_from_cache(basename, out_lang)

            if summary is not None and objects is not None:
                logger.info(f"found cached_video_content: {url}")
                poster_response: Optional[poster] = create_poster(pdf_name, objects, out_lang, summary, url)
        
                if poster_response is None:
                    return RunResponse(
                        event=RunEvent.workflow_completed,
                        content=f"Failed to generate video poster, please try again!",
                    ) 
                else:
                    logger.info(f"Poster is generated sucessfully.")
            
                    return RunResponse(
                        event=RunEvent.workflow_completed,
                        content=[None, poster_response],
                    ) 
    
        # Step 2: load video for the given url
        video_response: Optional[loader] = download_youtube_video(url)
        # If the video is not loaded sucessfully, end the workflow
        if video_response is None:
            return RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Sorry, could not load the video: {url}",
            )
        else:
            logger.info(f"Video {url} is loaded.")
            video_path = video_response
            
        # Step 3: summalize the video for the given questions
        summary_response:  Optional[summarizer] = summarize_video(video_path, user_prompt, out_lang)
        # If the summary is not generated, end the workflow
        if summary_response is None:
            return RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Failed to get summary, please try again!",
            )
        else:
            logger.info(f"Video summary is generated.")
            lang, summary = summary_response
            
        # Step 4: extract top 3 object(person or other) images
        images_response: Optional[detector] = object_detection_embed(video_path)
        # If objects are not detected sucessfully, end the workflow
        if images_response is None:
            return RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Failed to extract images, please try again!",
            ) 
        else:
            logger.info(f"Objects are extracted sucessfully.")
            objects = images_response
            
        # Step 5: generate video poster
        poster_response: Optional[poster] = create_poster(pdf_name, objects, lang, summary, url)
        
        if poster_response is None:
            return RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Failed to generate video poster, please try again!",
            ) 
        else:
            logger.info(f"Poster is generated sucessfully.")
            
            # Store in cache
            video_cache.add_to_cache(basename=basename, out_lang=out_lang, summary=summary, frames=objects)
            
            return RunResponse(
                event=RunEvent.workflow_completed,
                content=[video_path, poster_response],
            ) 
#=====================================================================================
# Combine outputs of face detection and video summary to generate a single page paster
def generate_poster_2(url, user_prompt, out_lang): 
    url_base_name = os.path.basename(url)
    jpg_name = generate_tmp_filename(url_base_name, ".jpg")
    
    # Initialize the poster generator workflow
    # - Creates a unique session ID based on the video url
    # - Sets up SQLite storage for caching results
    poster = VideoPosterGenerator(
        session_id=f"generate-poster-on-{url}",
        storage=SqlWorkflowStorage(
        table_name="generate_poster_workflows",
        db_file="tmp/workflows.db",
        ),
    )

    # Execute the workflow with caching enabled
    # Returns an iterator of RunResponse objects containing the generated content
    video_path, video_poster = poster.run(url=url, user_prompt=user_prompt, out_lang=out_lang, use_cache=True).content
    logger.info(f"video_poster: {video_poster}")

    poster_jpg = pdf_to_jpg(video_poster, jpg_name)
    
    return video_path, video_poster, jpg_name
#==================================================================================
# Gradio interface     
print("Setting up Gradio interface...")
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    video_cache = VideoCache()
    
    gr.Markdown(
        """
        # 🎥 Video Smart Summary - From Video to Poster with Multimodal Agent
        
        Provide a YouTube or other video url to get an AI-generated summary poster. 
        """
    )
    
    with gr.Row():
        with gr.Column(scale = 5, variant = "compact"):
            url_input = gr.Textbox(label="Paste YouTube URL here", 
                            placeholder="https://www.youtube.com/shorts/AE5HZsZOlkY", 
                            value="https://www.youtube.com/shorts/AE5HZsZOlkY")
            video_input = gr.Video(label="Downloaded Video", height = 300, scale = 5)
        
        with gr.Column(scale = 5, variant = "compact"):
            lang_name = gr.Dropdown(
                choices=LANG_OPTIONS,
                value=LANG_OPTIONS[0],
                label="Output Language",
                interactive = True,
            )
            
            user_prompt = gr.Textbox(label="📊 User Prompt", 
            value=
f'''0. **Title**: Summarize this video in one sentence with no more than 8 words.
1. **Story:** How the set scene introduced and tone is set. What is happening in the scene? Describe key visuals and actions. 
2. **Characters**: Identify top three character, noting their expressions, attire, actions, and interactions. Highlight emotional nuances and gestures.
3. **Narration or Voiceover**: Describe what types of narrations or voiceovers are used in the video.  
4. **Mood and Tone**: Capture the overall mood and tone of each scene, mentioning any music or sound effects that enhance these elements.''', 
            placeholder="Ask anything about the video - AI Agent will analyze everything and search the web if needed",
            info="You can ask questions about the video content",
            max_lines=30,
            interactive = True)
            
    with gr.Row():      
        poster_button = gr.Button("🚀 Generate Poster", variant="primary")
           
    with gr.Row():    
        with gr.Column(scale = 6, variant = "compact"):
            jpg_file = gr.Image(label="Generated Poster Image", type = "filepath")
        with gr.Column(scale = 4, variant = "compact"):    
            pdf_file = gr.File(label="Generated Poster PDF", file_types=[".pdf"])

  
    gr.Markdown(
        """
        ### How to use:
        1. Paste a YouTube link in the URL input textbox;
        2. Select output language you want to use, currently only support original(default, no translation), English and Chinese;
        3. Modify you prompt questions if you want (optional);
        4. Click the primary task button "Generate Poster";
        5. Downalod generated poster (JPG or PDF) file from ‘Generated Poster ...’ block.
        
        *Note: Processing may take a few minutes depending on the video length.*
        *If you get error for some reason, retry it before debug it!*
        """
    )

    # actions
    poster_button.click(generate_poster_2, inputs=[url_input, user_prompt, lang_name], outputs=[video_input, pdf_file, jpg_file])  
    
demo.launch(share=True)