File size: 8,260 Bytes
6d01f39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import os
import time
import json
import logging
import requests
from typing import Optional, Dict, Any, List
from urllib.parse import urlparse

# --- Logging Setup ---
def get_logger(name: str = __name__):
    logger = logging.getLogger(name)
    if not logger.handlers:
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('sora_video_downloader.log'),
                logging.StreamHandler()
            ]
        )
    return logger

logger = get_logger()

# --- Sora API Client ---
class SoraClient:
    def __init__(self, api_key: str, base_url: str, api_version: str = "preview"):
        self.api_key = api_key
        self.base_url = base_url.rstrip('/')
        self.api_version = api_version
        self.session = requests.Session()
        self.session.headers.update({
            'api-key': self.api_key,
            'Content-Type': 'application/json'
        })
        logger.info("SoraClient initialized")

    def start_video_job(self, prompt: str, height: int = 1080, width: int = 1080, n_seconds: int = 5, n_variants: int = 1) -> Optional[str]:
        url = f"{self.base_url}/openai/v1/video/generations/jobs?api-version={self.api_version}"
        payload = {
            "model": "sora",
            "prompt": prompt,
            "height": str(height),
            "width": str(width),
            "n_seconds": str(n_seconds),
            "n_variants": str(n_variants)
        }
        try:
            response = self.session.post(url, json=payload)
            if response.status_code in [200, 201, 202]:
                result = response.json()
                job_id = result.get('id') or result.get('job_id') or result.get('jobId')
                return job_id
            else:
                logger.error(f"Failed to start job: {response.status_code} {response.text}")
                return None
        except Exception as e:
            logger.error(f"Exception in start_video_job: {e}")
            return None

    def get_job_status(self, job_id: str) -> Dict[str, Any]:
        url = f"{self.base_url}/openai/v1/video/generations/jobs/{job_id}?api-version={self.api_version}"
        try:
            response = self.session.get(url)
            if response.status_code == 200:
                return response.json()
            else:
                logger.error(f"Failed to get job status: {response.status_code} {response.text}")
                return {"status": "error", "error": f"HTTP {response.status_code}"}
        except Exception as e:
            logger.error(f"Exception in get_job_status: {e}")
            return {"status": "error", "error": str(e)}

    def wait_for_job(self, job_id: str, max_wait_time: int = 300, poll_interval: int = 10) -> Optional[Dict[str, Any]]:
        start_time = time.time()
        while time.time() - start_time < max_wait_time:
            status_result = self.get_job_status(job_id)
            status = status_result.get('status', '').lower()
            if status in ['completed', 'succeeded', 'success']:
                return status_result
            elif status in ['failed', 'error']:
                return None
            time.sleep(poll_interval)
        logger.error(f"Job {job_id} timed out after {max_wait_time}s")
        return None

    def get_generation_details(self, generation_id: str) -> Dict[str, Any]:
        url = f"{self.base_url}/openai/v1/video/generations/{generation_id}?api-version={self.api_version}"
        try:
            resp = self.session.get(url)
            if resp.status_code == 200:
                return resp.json()
            else:
                logger.error(f"Failed to fetch generation details: HTTP {resp.status_code}")
                return {}
        except Exception as e:
            logger.error(f"Exception in get_generation_details: {e}")
            return {}

    def extract_video_urls(self, job_result: Dict[str, Any]) -> List[str]:
        video_urls = []
        generations = job_result.get('generations', [])
        if isinstance(generations, list) and generations:
            for g in generations:
                gen_id = g.get('id') if isinstance(g, dict) else None
                if not gen_id:
                    continue
                content_url = f"{self.base_url}/openai/v1/video/generations/{gen_id}/content/video?api-version={self.api_version}"
                video_urls.append(content_url)
        return video_urls

    def download_video(self, video_url: str, output_filename: str) -> bool:
        try:
            download_session = requests.Session()
            download_session.headers.update({'api-key': self.api_key})
            response = download_session.get(video_url, stream=True)
            if response.status_code == 200:
                with open(output_filename, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                return True
            else:
                logger.error(f"Failed to download video: {response.status_code} {response.text}")
                return False
        except Exception as e:
            logger.error(f"Exception in download_video: {e}")
            return False

# --- Video Job Abstraction ---
class VideoJob:
    def __init__(self, sora_client: SoraClient, prompt: str, height: int = 1080, width: int = 1080, n_seconds: int = 5, n_variants: int = 1):
        self.sora_client = sora_client
        self.prompt = prompt
        self.height = height
        self.width = width
        self.n_seconds = n_seconds
        self.n_variants = n_variants
        self.job_id: Optional[str] = None
        self.result: Optional[Dict[str, Any]] = None
        self.video_urls: List[str] = []

    def run(self, wait: bool = True) -> bool:
        self.job_id = self.sora_client.start_video_job(
            self.prompt, self.height, self.width, self.n_seconds, self.n_variants
        )
        if not self.job_id:
            logger.error("Failed to start video job")
            return False
        if wait:
            self.result = self.sora_client.wait_for_job(self.job_id)
            if not self.result:
                logger.error("Job failed or timed out")
                return False
            self.video_urls = self.sora_client.extract_video_urls(self.result)
            if not self.video_urls:
                logger.error("No video URLs found")
                return False
        return True

    def download_videos(self, output_dir: str) -> List[str]:
        saved_files = []
        for i, url in enumerate(self.video_urls):
            filename = f"sora_video_{self.job_id}_{i+1}.mp4"
            filepath = os.path.join(output_dir, filename)
            if self.sora_client.download_video(url, filepath):
                saved_files.append(filepath)
        return saved_files

# --- Video Storage Handler ---
class VideoStorage:
    def __init__(self, storage_dir: str = "videos"):
        self.storage_dir = storage_dir
        os.makedirs(self.storage_dir, exist_ok=True)

    def save_video(self, src_path: str, filename: str) -> str:
        dst_path = os.path.join(self.storage_dir, filename)
        os.rename(src_path, dst_path)
        return dst_path

    def list_videos(self) -> List[str]:
        return [os.path.join(self.storage_dir, f) for f in os.listdir(self.storage_dir) if f.endswith('.mp4')]

# --- Main for CLI usage (optional) ---
def main():
    api_key = os.getenv('AZURE_OPENAI_API_KEY') or os.getenv('AZURE_API_KEY')
    base_url = os.getenv('AZURE_OPENAI_ENDPOINT') or "https://levm3-me7f7pgq-eastus2.cognitiveservices.azure.com"
    if not api_key or not base_url:
        print("Please set your Azure API key and endpoint.")
        return
    sora = SoraClient(api_key, base_url)
    prompt = "A video of a cat playing with a ball of yarn in a sunny room"
    job = VideoJob(sora, prompt)
    if job.run():
        storage = VideoStorage()
        files = job.download_videos(storage.storage_dir)
        print(f"Downloaded: {files}")
    else:
        print("Video generation failed.")

if __name__ == "__main__":
    main()