Spaces:
Paused
Paused
| import streamlit as st | |
| from huggingface_hub import HfApi | |
| import os | |
| import json | |
| from datetime import datetime | |
| import cv2 | |
| import random | |
| from PIL import Image | |
| import string | |
| import subprocess | |
| import glob | |
| import shutil | |
| # Initialize the Hugging Face API with the token | |
| api = HfApi(token=os.getenv("HF_API_TOKEN")) | |
| def generate_random_string(length=4): | |
| return ''.join(random.choices(string.ascii_lowercase, k=length)) | |
| def add_random_to_filename(filename): | |
| name, ext = os.path.splitext(filename) | |
| random_string = generate_random_string() | |
| return f"{name}-{random_string}{ext}" | |
| def extract_thumbnail(video_path, thumbnail_path): | |
| video = cv2.VideoCapture(video_path) | |
| total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| random_frame = random.randint(0, total_frames - 1) | |
| video.set(cv2.CAP_PROP_POS_FRAMES, random_frame) | |
| success, frame = video.read() | |
| if success: | |
| cv2.imwrite(thumbnail_path, frame) | |
| video.release() | |
| return success | |
| def save_custom_thumbnail(thumbnail_file, thumbnail_path): | |
| img = Image.open(thumbnail_file) | |
| img.save(thumbnail_path) | |
| return True | |
| def get_video_length(video_path): | |
| video = cv2.VideoCapture(video_path) | |
| fps = video.get(cv2.CAP_PROP_FPS) # Frames per second | |
| total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) # Total frames in the video | |
| duration = int(total_frames / fps) if fps > 0 else 0 # Duration in seconds, as an integer | |
| video.release() | |
| return duration | |
| def generate_metadata(video_name, title, description, uploader, file_location, thumbnail_location, duration): | |
| return { | |
| "fileName": video_name, | |
| "title": title, | |
| "description": description, | |
| "uploader": uploader, | |
| "uploadTimestamp": datetime.now().isoformat(), | |
| "fileLocation": file_location, | |
| "thumbnailLocation": thumbnail_location, | |
| "duration": duration, # Add duration here | |
| "views": 0, | |
| "likes": 0 | |
| } | |
| def update_index_file(new_metadata_path): | |
| # Create a temporary directory for cloning | |
| temp_dir = "temp_repo" | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| try: | |
| # Clone the repository | |
| subprocess.run(['git', 'clone', 'https://huggingface.co/spaces/vericudebuget/ok4231', temp_dir], check=True) | |
| # Find all JSON files in the metadata directory | |
| metadata_dir = os.path.join(temp_dir, 'metadata') | |
| json_files = glob.glob(os.path.join(metadata_dir, '*-index.json')) | |
| # Create the paths string | |
| base_url = "huggingface.co/spaces/vericudebuget/ok4231/raw/main/metadata/" | |
| paths = [f"{base_url}{os.path.basename(f)}" for f in json_files] | |
| # Add the new metadata path if it's not already there | |
| new_metadata_filename = os.path.basename(new_metadata_path) | |
| new_full_path = f"{base_url}{new_metadata_filename}" | |
| if new_full_path not in paths: | |
| paths.append(new_full_path) | |
| # Create the index content | |
| index_content = "{ " + "; ".join(paths) + "; }" | |
| # Write to a temporary file | |
| index_path = os.path.join(temp_dir, 'metadata', 'video-index.json') | |
| os.makedirs(os.path.dirname(index_path), exist_ok=True) | |
| with open(index_path, 'w') as f: | |
| f.write(index_content) | |
| # Upload the index file | |
| api.upload_file( | |
| path_or_fileobj=index_path, | |
| path_in_repo="metadata/video-index.json", | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| finally: | |
| # Clean up | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| def upload_video_to_hf(video_file, original_video_name, title, description, uploader, custom_thumbnail=None): | |
| # Create temp paths | |
| temp_dir = "temp" | |
| if not os.path.exists(temp_dir): | |
| os.makedirs(temp_dir) | |
| try: | |
| # Generate randomized filenames | |
| video_name = add_random_to_filename(original_video_name) | |
| video_path = os.path.join(temp_dir, video_name) | |
| base_name = os.path.splitext(video_name)[0] | |
| thumbnail_name = f"{base_name}_thumb.jpg" | |
| thumbnail_path = os.path.join(temp_dir, thumbnail_name) | |
| json_name = f"{base_name}-index.json" | |
| json_path = os.path.join(temp_dir, json_name) | |
| # Write the video content to a file | |
| with open(video_path, "wb") as f: | |
| f.write(video_file.read()) | |
| # Handle thumbnail | |
| if custom_thumbnail: | |
| thumbnail_extracted = save_custom_thumbnail(custom_thumbnail, thumbnail_path) | |
| else: | |
| thumbnail_extracted = extract_thumbnail(video_path, thumbnail_path) | |
| if not thumbnail_extracted: | |
| st.error("Failed to process thumbnail") | |
| return None | |
| # Get video length | |
| video_length = get_video_length(video_path) | |
| # Upload the video | |
| video_location = f"videos/{video_name}" | |
| api.upload_file( | |
| path_or_fileobj=video_path, | |
| path_in_repo=video_location, | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| # Upload the thumbnail | |
| thumbnail_location = f"thumbnails/{thumbnail_name}" | |
| api.upload_file( | |
| path_or_fileobj=thumbnail_path, | |
| path_in_repo=thumbnail_location, | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| # Generate and upload metadata JSON | |
| metadata = generate_metadata(video_name, title, description, uploader, video_location, thumbnail_location, video_length) | |
| with open(json_path, "w") as f: | |
| json.dump(metadata, f, indent=2) | |
| metadata_location = f"metadata/{json_name}" | |
| api.upload_file( | |
| path_or_fileobj=json_path, | |
| path_in_repo=metadata_location, | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| # Update the index file | |
| update_index_file(metadata_location) | |
| return metadata | |
| finally: | |
| # Cleanup temp files | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| # Streamlit app interface | |
| st.title("Upload your video") | |
| st.markdown("---") | |
| # File uploader for video input | |
| uploaded_video = st.file_uploader("Choose video file", type=["mp4", "avi", "mov"]) | |
| if uploaded_video: | |
| # Show the video details form | |
| with st.form("video_details"): | |
| st.write("Video Details") | |
| title = st.text_input("Title", placeholder="Enter video title") | |
| description = st.text_area("Description", placeholder="Enter video description") | |
| uploader = st.text_input("Uploader Name", placeholder="Enter your name") | |
| # Optional custom thumbnail uploader | |
| custom_thumbnail = st.file_uploader("Upload custom thumbnail (optional)", type=["jpg", "jpeg", "png"]) | |
| # Upload button within the form | |
| submit_button = st.form_submit_button("Upload Video") | |
| if submit_button: | |
| if not title or not uploader: | |
| st.error("Please fill in the title and uploader name.") | |
| else: | |
| with st.spinner("Uploading video, generating thumbnail and metadata..."): | |
| metadata = upload_video_to_hf( | |
| uploaded_video, | |
| uploaded_video.name, | |
| title, | |
| description, | |
| uploader, | |
| custom_thumbnail | |
| ) | |
| if metadata: | |
| st.success("Upload completed successfully!") | |
| st.json(metadata) | |
| else: | |
| st.info("Please upload a video file to begin.") | |