Spaces:
Running
on
Zero
Running
on
Zero
from spandrel import ModelLoader | |
import torch | |
from pathlib import Path | |
from PIL import Image | |
import gradio as App | |
import numpy as np | |
import subprocess | |
import logging | |
import spaces | |
import time | |
import os | |
import gc | |
import io | |
import cv2 | |
from gradio import themes | |
from rich.console import Console | |
from rich.logging import RichHandler | |
# ============================== # | |
# Core Settings # | |
# ============================== # | |
Theme = themes.Citrus(primary_hue='blue', radius_size=themes.sizes.radius_xxl) | |
ModelDir = Path('./Models') | |
TempDir = Path('./Temp') | |
os.environ['GRADIO_TEMP_DIR'] = str(TempDir) | |
ModelFileType = '.pth' | |
# ============================== # | |
# Enhanced Logging # | |
# ============================== # | |
logging.basicConfig(level=logging.INFO, format='%(message)s', datefmt='[%X]', | |
handlers=[RichHandler(console=Console(), rich_tracebacks=True)]) | |
Logger = logging.getLogger('Video2x') | |
logging.getLogger('httpx').setLevel(logging.WARNING) | |
# ============================== # | |
# Device Configuration # | |
# ============================== # | |
def GetDeviceName(): | |
Device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
Logger.info(f'βοΈ Using device: {Device}') | |
return Device | |
Device = GetDeviceName() | |
# ============================== # | |
# Optimized Functions # | |
# ============================== # | |
def FormatTimeEstimate(Seconds): | |
Hours = int(Seconds // 3600) | |
Minutes = int((Seconds % 3600) // 60) | |
Seconds = int(Seconds % 60) | |
if Hours > 0: | |
return f'{Hours}h {Minutes}m {Seconds}s' | |
elif Minutes > 0: | |
return f'{Minutes}m {Seconds}s' | |
else: | |
return f'{Seconds}s' | |
def ListModels(): | |
Models = sorted([File.name for File in ModelDir.glob('*' + ModelFileType) if File.is_file()]) | |
Logger.info(f'π Found {len(Models)} Models In Directory') | |
return Models | |
def LoadModel(ModelName): | |
if Device.type == 'cuda': | |
torch.cuda.empty_cache() | |
Logger.info(f'π Loading model: {ModelName} onto {Device}') | |
Model = ModelLoader().load_from_file(ModelDir / (ModelName + ModelFileType)).to(Device).eval() # Use .to(Device) | |
Logger.info('β Model Loaded Successfully') | |
return Model | |
def ProcessSingleFrame(OriginalImage, Model, TileGridSize): | |
if TileGridSize > 1: | |
Logger.info(f'π§© Processing With Tile Grid {TileGridSize}x{TileGridSize}') | |
Width, Height = OriginalImage.size | |
TileWidth, TileHeight = Width // TileGridSize, Height // TileGridSize | |
UpscaledTilesGrid = [] | |
for Row in range(TileGridSize): | |
CurrentRowTiles = [] | |
for Col in range(TileGridSize): | |
Tile = OriginalImage.crop((Col * TileWidth, Row * TileHeight, | |
(Col + 1) * TileWidth, (Row + 1) * TileHeight)) | |
TileTensor = torch.from_numpy(np.array(Tile)).permute(2, 0, 1).unsqueeze(0).float().to(Device) / 255.0 | |
with torch.no_grad(): | |
UpscaledTileTensor = Model(TileTensor) | |
UpscaledTileNumpy = UpscaledTileTensor.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
CurrentRowTiles.append(Image.fromarray(np.uint8(UpscaledTileNumpy.clip(0.0, 1.0) * 255.0), mode='RGB')) | |
del TileTensor, UpscaledTileTensor, UpscaledTileNumpy | |
UpscaledTilesGrid.append(CurrentRowTiles) | |
FirstTileWidth, FirstTileHeight = UpscaledTilesGrid[0][0].size | |
UpscaledImage = Image.new('RGB', (FirstTileWidth * TileGridSize, FirstTileHeight * TileGridSize)) | |
for Row in range(TileGridSize): | |
for Col in range(TileGridSize): | |
UpscaledImage.paste(UpscaledTilesGrid[Row][Col], (Col * FirstTileWidth, Row * FirstTileHeight)) | |
else: | |
TorchImage = torch.from_numpy(np.array(OriginalImage)).permute(2, 0, 1).unsqueeze(0).float().to(Device) / 255.0 | |
with torch.no_grad(): | |
ResultTensor = Model(TorchImage) | |
ResultNumpy = ResultTensor.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
UpscaledImage = Image.fromarray(np.uint8(ResultNumpy.clip(0.0, 1.0) * 255.0), mode='RGB') | |
del TorchImage, ResultTensor, ResultNumpy | |
return UpscaledImage | |
def Process(VideoInputPath, ModelName, FrameRateValue, TileGridSize, FileType, Progress=App.Progress()): | |
# First yield should match the order of outputs in the click function | |
yield None, App.update(interactive=False, value=None) | |
if not VideoInputPath or not ModelName or not FileType: | |
Logger.error('β Missing Inputs!') | |
return None, None | |
VideoPath = Path(VideoInputPath) | |
OutputVideoPath = VideoPath.parent / f'{VideoPath.stem}_{Path(ModelName).stem}{"_Tiled" + str(TileGridSize) if TileGridSize > 1 else ""}{FileType}' | |
# Load model | |
Progress(0.0, 'π Loading Model') | |
Model = LoadModel(ModelName) | |
# Extract video info | |
Logger.info(f'π¬ Extracting Video Information From {VideoPath.name}') | |
VideoCapture = cv2.VideoCapture(str(VideoPath)) | |
FrameCount = int(VideoCapture.get(cv2.CAP_PROP_FRAME_COUNT)) | |
if not FrameRateValue: | |
FrameRateValue = VideoCapture.get(cv2.CAP_PROP_FPS) | |
Logger.info(f'ποΈ Processing {FrameCount} Frames At {FrameRateValue} FPS') | |
# In-memory frames processing | |
FrameBuffer = [] | |
AllFrames = [] | |
# Time tracking variables | |
StartTime = time.time() | |
FrameProcessingTime = None | |
for FrameIndex in range(FrameCount): | |
FrameStartTime = time.time() | |
Success, Frame = VideoCapture.read() | |
if not Success: | |
Logger.warning(f'β οΈ Failed To Read Frame {FrameIndex}') | |
continue | |
# Convert from BGR to RGB | |
OriginalImage = Image.fromarray(cv2.cvtColor(Frame, cv2.COLOR_BGR2RGB)) | |
UpscaledImage = ProcessSingleFrame(OriginalImage, Model, TileGridSize) | |
# Store for preview | |
ResizedOriginalImage = OriginalImage.resize(UpscaledImage.size, Image.Resampling.LANCZOS) | |
AllFrames.append((ResizedOriginalImage, UpscaledImage.copy())) | |
# Save to buffer for video output | |
ImageBytes = io.BytesIO() | |
UpscaledImage.save(ImageBytes, format='PNG') | |
FrameBuffer.append(ImageBytes.getvalue()) | |
# Calculate time estimates | |
CurrentFrameTime = time.time() - FrameStartTime | |
if FrameIndex == 0: | |
FrameProcessingTime = CurrentFrameTime | |
Logger.info(f'β±οΈ First Frame Took {FrameProcessingTime:.2f}s To Process') | |
# Calculate remaining time based on average processing time so far | |
ElapsedTime = time.time() - StartTime | |
AverageTimePerFrame = ElapsedTime / (FrameIndex + 1) | |
RemainingFrames = FrameCount - (FrameIndex + 1) | |
EstimatedRemainingTime = RemainingFrames * AverageTimePerFrame | |
# Format time estimates for display | |
RemainingTimeFormatted = FormatTimeEstimate(EstimatedRemainingTime) | |
Progress( | |
(FrameIndex + 1) / FrameCount, | |
f'π Frame {FrameIndex+1}/{FrameCount} | ETA: {RemainingTimeFormatted}' | |
) | |
del OriginalImage, UpscaledImage, ImageBytes | |
gc.collect() | |
VideoCapture.release() | |
# Write frames to temporary files for ffmpeg | |
Logger.info('πΎ Preparing Frames For Video Encoding') | |
os.makedirs(TempDir, exist_ok=True) | |
for Index, FrameData in enumerate(FrameBuffer): | |
with open(f'{TempDir}/Frame_{Index:06d}.png', 'wb') as f: | |
f.write(FrameData) | |
# Create video | |
Progress(1.0, 'π₯ Encoding Video') | |
Logger.info('π₯ Encoding Final Video') | |
FfmpegCmd = f'ffmpeg -y -framerate {FrameRateValue} -i "{TempDir}/Frame_%06d.png" -c:v libx264 -pix_fmt yuv420p "{OutputVideoPath}" -hide_banner -loglevel error' | |
subprocess.run(FfmpegCmd, shell=True, check=True) | |
# Clean up | |
for File in Path(TempDir).glob('Frame_*.png'): | |
File.unlink() | |
Logger.info(f'π Video Saved To: {OutputVideoPath}') | |
# Update UI - return values directly in the order specified in the click function | |
FirstFrame = AllFrames[0] if AllFrames else None | |
DownloadValue = App.update(interactive=True, value=str(OutputVideoPath)) | |
yield FirstFrame, DownloadValue | |
# Release resources | |
del Model, FrameBuffer, AllFrames | |
Progress(1.0, 'π§Ή Cleaning Up Resources') | |
gc.collect() | |
if Device.type == 'cuda': | |
torch.cuda.empty_cache() | |
Logger.info('π§Ή CUDA Memory Cleaned Up') | |
Logger.info('π§Ή Model Unloaded') | |
Progress(1.0, 'π¦ Done!') | |
# ============================== # | |
# Streamlined UI # | |
# ============================== # | |
with App.Blocks(title='Video Upscaler', theme=Theme, delete_cache=(60, 600)) as Interface: | |
App.Markdown('# ποΈ Video Upscaler') | |
App.Markdown(''' | |
Space created by [Hyphonical](https://huggingface.co/Hyphonical), this space uses several models from [styler00dollar/VSGAN-tensorrt-docker](https://github.com/styler00dollar/VSGAN-tensorrt-docker/releases/tag/models) | |
You may always request adding more models by opening a [new discussion](https://huggingface.co/spaces/Hyphonical/Video2x/discussions/new). The main program uses spandrel to load the models and ffmpeg to process the video. | |
You may run out of time using the ZeroGPU, you could clone the space or run it locally for better performance. | |
''') | |
with App.Row(): | |
with App.Column(scale=1): | |
with App.Group(): | |
InputVideo = App.Video(label='Input Video', sources=['upload'], height=300) | |
ModelList = ListModels() | |
ModelNames = [Path(Model).stem for Model in ModelList] | |
InputModel = App.Dropdown(choices=ModelNames, label='Select Model', value=ModelNames[0] if ModelNames else None) | |
with App.Row(): | |
InputFrameRate = App.Slider(label='Frame Rate', minimum=1, maximum=60, value=23.976, step=0.001) | |
InputTileGridSize = App.Slider(label='Tile Grid Size', minimum=1, maximum=6, value=1, step=1, show_reset_button=False) | |
InputFileType = App.Dropdown(choices=['.mp4', '.mkv'], label='Output File Type', value='.mkv', interactive=True) | |
SubmitButton = App.Button('π Upscale Video') | |
with App.Column(scale=1, show_progress=True): | |
OutputSlider = App.ImageSlider(label='Output Preview', value=None, height=300) | |
DownloadOutput = App.DownloadButton(label='πΎ Download Video', interactive=False) | |
with App.Accordion(label='π Instructions', open=False): | |
App.Markdown(''' | |
### How To Use The Video Upscaler | |
1. **Upload A Video:** Begin by uploading your video file using the 'Input Video' section. | |
2. **Select A Model:** Choose an appropriate upscaling model from the 'Select Model' dropdown menu. | |
3. **Adjust Settings (Optional):** | |
Modify the 'Frame Rate' slider if you want to change the output video's frame rate. | |
Adjust the 'Tile Grid Size' for memory optimization. Larger models might require a higher grid size, but processing could be slower. | |
4. **Start Processing:** Click the 'π Upscale Video' button to begin the upscaling process. | |
5. **Download The Result:** Once the process is complete, download the upscaled video using the 'πΎ Download Video' button. | |
> Tip: If you get a CUDA out of memory error, try increasing the Tile Grid Size. This will split the image into smaller tiles for processing, which can help reduce memory usage. | |
''') | |
SubmitButton.click(fn=Process, inputs=[InputVideo, InputModel, InputFrameRate, InputTileGridSize, InputFileType], | |
outputs=[OutputSlider, DownloadOutput]) | |
if __name__ == '__main__': | |
os.makedirs(ModelDir, exist_ok=True) | |
os.makedirs(TempDir, exist_ok=True) | |
Logger.info('π Starting Video Upscaler') | |
Interface.launch(pwa=True) |