|
|
|
%%capture |
|
!pip install -q torch torchvision torchaudio |
|
!pip install -q git+https://github.com/openai/whisper.git |
|
!pip install -q yt-dlp moviepy gradio praw google-api-python-client |
|
!sudo apt update |
|
!sudo apt install -y ffmpeg |
|
|
|
import os |
|
import shutil |
|
import concurrent.futures |
|
import gradio as gr |
|
import numpy as np |
|
from PIL import Image, ImageEnhance |
|
from moviepy.editor import VideoFileClip, CompositeVideoClip, TextClip, AudioFileClip |
|
import yt_dlp as youtube_dl |
|
import requests |
|
import whisper |
|
import tempfile |
|
import subprocess |
|
import logging |
|
from googleapiclient.discovery import build |
|
import praw |
|
import torch |
|
|
|
|
|
logging.basicConfig(level=logging.ERROR) |
|
logger = logging.getLogger() |
|
logger.setLevel(logging.ERROR) |
|
|
|
|
|
OUTPUT_DIR = "/content/processed_videos" |
|
TEMP_DIR = "/content/temp_video_files" |
|
|
|
for d in [OUTPUT_DIR, TEMP_DIR]: |
|
if not os.path.exists(d): |
|
os.makedirs(d) |
|
|
|
|
|
print("CUDA kullanılabilir mi?", torch.cuda.is_available()) |
|
model = whisper.load_model("base") |
|
print("✅ Whisper model başarıyla yüklendi") |
|
|
|
|
|
def get_youtube_trending(youtube_api_key, region_code="TR", max_results=5): |
|
"""YouTube'dan trend videoları al""" |
|
try: |
|
youtube = build("youtube", "v3", developerKey=youtube_api_key) |
|
req = youtube.videos().list( |
|
part="snippet,contentDetails,statistics", |
|
chart="mostPopular", |
|
regionCode=region_code, |
|
maxResults=max_results |
|
) |
|
res = req.execute() |
|
videos = [] |
|
for item in res.get("items", []): |
|
videos.append({ |
|
"id": item["id"], |
|
"title": item["snippet"]["title"] |
|
}) |
|
return videos |
|
except Exception as e: |
|
print(f"❌ YouTube trend hatası: {e}") |
|
return [] |
|
|
|
def get_reddit_videos(client_id, client_secret, user_agent, subreddit="funny", limit=5): |
|
"""Reddit'ten popüler videoları al""" |
|
try: |
|
reddit = praw.Reddit( |
|
client_id=client_id, |
|
client_secret=client_secret, |
|
user_agent=user_agent |
|
) |
|
sub = reddit.subreddit(subreddit) |
|
videos = [] |
|
for post in sub.hot(limit=limit): |
|
if post.is_video and post.media and 'reddit_video' in post.media: |
|
media = post.media['reddit_video'] |
|
video_url = media['fallback_url'] |
|
|
|
|
|
audio_url = None |
|
if media.get('has_audio', False): |
|
base_url = video_url.split('_')[0] |
|
audio_url = f"{base_url}_audio.mp4" |
|
|
|
videos.append({ |
|
"id": post.id, |
|
"title": post.title, |
|
"video_url": video_url, |
|
"audio_url": audio_url |
|
}) |
|
return videos |
|
except Exception as e: |
|
print(f"❌ Reddit video hatası: {e}") |
|
return [] |
|
|
|
def download_youtube_video(video_id, output_path): |
|
"""YouTube videosunu indir""" |
|
try: |
|
url = f"https://www.youtube.com/watch?v={video_id}" |
|
ydl_opts = { |
|
'format': 'best[height<=720]', |
|
'outtmpl': output_path, |
|
'quiet': True, |
|
'no_warnings': True |
|
} |
|
with youtube_dl.YoutubeDL(ydl_opts) as ydl: |
|
ydl.download([url]) |
|
return True |
|
except Exception as e: |
|
print(f"❌ YouTube indirme hatası: {e}") |
|
return False |
|
|
|
def download_reddit_video(video_url, audio_url, output_path): |
|
"""Reddit videosunu ve sesini indir ve birleştir""" |
|
try: |
|
|
|
video_path = f"{output_path}_video.mp4" |
|
r = requests.get(video_url, stream=True, timeout=30) |
|
r.raise_for_status() |
|
with open(video_path, "wb") as f: |
|
for chunk in r.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
|
|
|
|
if audio_url: |
|
audio_path = f"{output_path}_audio.mp4" |
|
r_audio = requests.get(audio_url, stream=True, timeout=30) |
|
r_audio.raise_for_status() |
|
with open(audio_path, "wb") as f: |
|
for chunk in r_audio.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
|
|
|
|
video_clip = VideoFileClip(video_path) |
|
audio_clip = AudioFileClip(audio_path) |
|
final_clip = video_clip.set_audio(audio_clip) |
|
final_clip.write_videofile( |
|
output_path, |
|
codec='libx264', |
|
audio_codec='aac', |
|
verbose=False, |
|
logger=None, |
|
threads=4 |
|
) |
|
|
|
|
|
os.remove(video_path) |
|
os.remove(audio_path) |
|
else: |
|
|
|
os.rename(video_path, output_path) |
|
return True |
|
except Exception as e: |
|
print(f"❌ Reddit indirme hatası: {e}") |
|
return False |
|
|
|
def generate_subtitle_text(video_path, language='tr'): |
|
"""Whisper ile altyazı oluştur""" |
|
try: |
|
clip = VideoFileClip(video_path) |
|
audio_path = os.path.join(TEMP_DIR, f"temp_audio.wav") |
|
clip.audio.write_audiofile(audio_path, verbose=False, logger=None) |
|
result = model.transcribe(audio_path, language=language) |
|
os.remove(audio_path) |
|
return result["text"] |
|
except Exception as e: |
|
print(f"❌ Altyazı oluşturma hatası: {e}") |
|
return "" |
|
|
|
def process_video_with_subtitle(input_path, output_path, subtitle_text, apply_filter=True): |
|
"""Videoyu işle ve altyazı ekle""" |
|
try: |
|
clip = VideoFileClip(input_path) |
|
target_w, target_h = 720, 1280 |
|
|
|
|
|
clip_ratio = clip.w / clip.h |
|
target_ratio = target_w / target_h |
|
|
|
if clip_ratio > target_ratio: |
|
|
|
clip = clip.resize(height=target_h) |
|
excess_width = clip.w - target_w |
|
clip = clip.crop(x1=excess_width//2, x2=clip.w - excess_width//2) |
|
else: |
|
|
|
clip = clip.resize(width=target_w) |
|
excess_height = clip.h - target_h |
|
clip = clip.crop(y1=excess_height//2, y2=clip.h - excess_height//2) |
|
|
|
|
|
if apply_filter: |
|
def enhance_frame(get_frame, t): |
|
frame = get_frame(t) |
|
img = Image.fromarray(frame) |
|
|
|
enhancer = ImageEnhance.Brightness(img) |
|
img = enhancer.enhance(1.1) |
|
enhancer = ImageEnhance.Color(img) |
|
img = enhancer.enhance(1.2) |
|
return np.array(img) |
|
|
|
clip = clip.fl(enhance_frame) |
|
|
|
|
|
if subtitle_text.strip(): |
|
txt_clip = TextClip( |
|
subtitle_text, |
|
fontsize=40, |
|
font='Arial-Bold', |
|
color='yellow', |
|
stroke_color='black', |
|
stroke_width=2, |
|
method='caption', |
|
size=(target_w-40, None), |
|
align='center' |
|
) |
|
txt_clip = txt_clip.set_position(('center', target_h - 150)).set_duration(clip.duration) |
|
final = CompositeVideoClip([clip, txt_clip]) |
|
else: |
|
final = clip |
|
|
|
|
|
final.write_videofile( |
|
output_path, |
|
codec='libx264', |
|
audio_codec='aac', |
|
fps=24, |
|
verbose=False, |
|
logger=None, |
|
threads=4 |
|
) |
|
return True |
|
except Exception as e: |
|
print(f"❌ Video işleme hatası: {e}") |
|
return False |
|
|
|
def process_single_youtube(video_id, title): |
|
"""Tek bir YouTube videosunu işle""" |
|
vid_path = os.path.join(TEMP_DIR, f"yt_{video_id}.mp4") |
|
out_path = os.path.join(OUTPUT_DIR, f"yt_{video_id}_processed.mp4") |
|
|
|
print(f"⏬ YouTube indiriliyor: {title}") |
|
if not download_youtube_video(video_id, vid_path): |
|
return None |
|
|
|
print("🔤 Altyazı oluşturuluyor...") |
|
subtitle = generate_subtitle_text(vid_path) |
|
print(f"✅ Altyazı tamamlandı ({len(subtitle)} karakter)") |
|
|
|
print("🎞️ Video işleniyor...") |
|
if process_video_with_subtitle(vid_path, out_path, subtitle, apply_filter=True): |
|
print(f"✅ İşlenen video kaydedildi: {out_path}") |
|
|
|
if os.path.exists(vid_path): |
|
os.remove(vid_path) |
|
return out_path |
|
else: |
|
print(f"❌ Video işleme başarısız: {title}") |
|
return None |
|
|
|
def process_single_reddit(video_url, audio_url, post_id, title): |
|
"""Tek bir Reddit videosunu işle""" |
|
vid_path = os.path.join(TEMP_DIR, f"reddit_{post_id}.mp4") |
|
out_path = os.path.join(OUTPUT_DIR, f"reddit_{post_id}_processed.mp4") |
|
|
|
print(f"⏬ Reddit indiriliyor: {title}") |
|
if not download_reddit_video(video_url, audio_url, vid_path): |
|
return None |
|
|
|
print("🔤 Altyazı oluşturuluyor...") |
|
subtitle = generate_subtitle_text(vid_path) |
|
print(f"✅ Altyazı tamamlandı ({len(subtitle)} karakter)") |
|
|
|
print("🎞️ Video işleniyor...") |
|
if process_video_with_subtitle(vid_path, out_path, subtitle, apply_filter=True): |
|
print(f"✅ İşlenen video kaydedildi: {out_path}") |
|
|
|
if os.path.exists(vid_path): |
|
os.remove(vid_path) |
|
return out_path |
|
else: |
|
print(f"❌ Video işleme başarısız: {title}") |
|
return None |
|
|
|
|
|
def run_automation(youtube_api_key, reddit_client_id, reddit_client_secret, reddit_user_agent, |
|
yt_max=2, reddit_max=2, reddit_subreddit="funny"): |
|
"""Ana otomasyon fonksiyonu""" |
|
results = [] |
|
|
|
|
|
print("📊 YouTube trend videoları alınıyor...") |
|
yt_videos = get_youtube_trending(youtube_api_key, max_results=yt_max) |
|
print(f"✅ {len(yt_videos)} YouTube videosu bulundu") |
|
|
|
|
|
print("📊 Reddit popüler videolar alınıyor...") |
|
reddit_videos = get_reddit_videos( |
|
reddit_client_id, |
|
reddit_client_secret, |
|
reddit_user_agent, |
|
subreddit=reddit_subreddit, |
|
limit=reddit_max |
|
) |
|
print(f"✅ {len(reddit_videos)} Reddit videosu bulundu") |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: |
|
futures = [] |
|
|
|
|
|
for v in yt_videos: |
|
futures.append( |
|
executor.submit( |
|
process_single_youtube, |
|
v["id"], |
|
v["title"] |
|
) |
|
) |
|
|
|
|
|
for v in reddit_videos: |
|
futures.append( |
|
executor.submit( |
|
process_single_reddit, |
|
v["video_url"], |
|
v["audio_url"], |
|
v["id"], |
|
v["title"] |
|
) |
|
) |
|
|
|
|
|
for future in concurrent.futures.as_completed(futures): |
|
try: |
|
out_path = future.result() |
|
if out_path: |
|
results.append(out_path) |
|
except Exception as e: |
|
print(f"❌ İşleme hatası: {e}") |
|
|
|
return results |
|
|
|
|
|
def start_process(youtube_api_key, reddit_client_id, reddit_client_secret, reddit_user_agent, |
|
yt_max, reddit_max, reddit_subreddit): |
|
"""Gradio arayüzü için başlatma fonksiyonu""" |
|
print("🚀 Otomasyon başlatıldı...") |
|
outputs = run_automation( |
|
youtube_api_key, |
|
reddit_client_id, |
|
reddit_client_secret, |
|
reddit_user_agent, |
|
int(yt_max), |
|
int(reddit_max), |
|
reddit_subreddit |
|
) |
|
|
|
|
|
result_msg = f"✅ {len(outputs)} video başarıyla işlendi!\n\n" |
|
for p in outputs: |
|
filename = os.path.basename(p) |
|
result_msg += f"• {filename}\n" |
|
|
|
|
|
if outputs: |
|
result_msg += "\n📥 İşlenen Videoları İndir:\n" |
|
for p in outputs: |
|
filename = os.path.basename(p) |
|
result_msg += f"<a href='/content/processed_videos/{filename}' download>{filename}</a><br>" |
|
|
|
return result_msg |
|
|
|
|
|
with gr.Blocks(title="YouTube & Reddit Otomasyonu", css=".download-link {color: blue; text-decoration: underline;}") as demo: |
|
gr.Markdown("# 🎬 Otomatik YouTube & Reddit Video İşleme Sistemi") |
|
gr.Markdown("Trend videoları indir, altyazı ekle ve Reels formatına dönüştür!") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### 🔑 API Ayarları") |
|
youtube_api_key = gr.Textbox( |
|
label="YouTube API Anahtarı", |
|
value="AIzaSyAg9ucUlixeslEWVDY7vxdIQukxzrvEHAc" |
|
) |
|
reddit_client_id = gr.Textbox( |
|
label="Reddit Client ID", |
|
value="LhklcvvwDar42dPUSxjYtg" |
|
) |
|
reddit_client_secret = gr.Textbox( |
|
label="Reddit Client Secret", |
|
value="dzja3SHDkCbEECSya_nZ7qgUGm-T1w" |
|
) |
|
reddit_user_agent = gr.Textbox( |
|
label="Reddit User Agent", |
|
value="Specialist-Arm-391" |
|
) |
|
|
|
with gr.Column(): |
|
gr.Markdown("### ⚙️ İşlem Ayarları") |
|
yt_max = gr.Slider( |
|
1, 5, value=2, step=1, |
|
label="YouTube Video Sayısı", |
|
info="İşlenecek YouTube video sayısı" |
|
) |
|
reddit_max = gr.Slider( |
|
1, 5, value=2, step=1, |
|
label="Reddit Video Sayısı", |
|
info="İşlenecek Reddit video sayısı" |
|
) |
|
reddit_subreddit = gr.Textbox( |
|
label="Reddit Subreddit", |
|
value="funny", |
|
placeholder="Örnek: funny, videos, memes" |
|
) |
|
|
|
btn = gr.Button("🚀 Otomasyonu Başlat", variant="primary") |
|
output_html = gr.HTML(label="📋 Sonuçlar") |
|
|
|
btn.click( |
|
fn=start_process, |
|
inputs=[ |
|
youtube_api_key, |
|
reddit_client_id, |
|
reddit_client_secret, |
|
reddit_user_agent, |
|
yt_max, |
|
reddit_max, |
|
reddit_subreddit |
|
], |
|
outputs=output_html |
|
) |
|
|
|
gr.Markdown("### 📌 Colab Özel Notlar") |
|
gr.Markdown(""" |
|
1. **İlk Çalıştırma:** Bağımlılıklar otomatik kurulur (1-2 dakika) |
|
2. **GPU Desteği:** Whisper transkripsiyonu için GPU kullanılır |
|
3. **Dosya Konumları:** |
|
- İşlenen videolar: `/content/processed_videos` |
|
- Geçici dosyalar: `/content/temp_video_files` |
|
4. **İndirme:** İşlem tamamlandığında videoları indirebileceğiniz linkler gösterilir |
|
5. **Oturum Sonu:** Colab oturumu kapatıldığında tüm dosyalar silinir |
|
""") |
|
|
|
|
|
print("🔥 Uygulama başlatılıyor...") |
|
demo.launch(share=True, debug=True) |