Spaces:
Sleeping
Sleeping
import torch | |
from transformers import AutoModel, AutoTokenizer | |
from modelscope.hub.snapshot_download import snapshot_download | |
from PIL import Image | |
from functools import lru_cache | |
from decord import VideoReader, cpu | |
import os | |
import gc | |
import cv2 | |
import tempfile | |
import shutil | |
import subprocess | |
import ffmpeg # Added for ffmpeg-python | |
from yolo_detection import is_image, is_video | |
# Constants for video processing | |
MAX_NUM_FRAMES = 32 # Reduced from 64 to potentially avoid OOM | |
# Check if CUDA is available | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
global TOTAL_CHUNKS | |
TOTAL_CHUNKS = 1 | |
# Initialize GPU if available | |
if DEVICE == "cuda": | |
def debug(): | |
torch.randn(10).cuda() | |
debug() | |
# Model configuration | |
MODEL_NAME = 'iic/mPLUG-Owl3-7B-240728' | |
MODEL_CACHE_DIR = "/data/models" | |
os.makedirs(MODEL_CACHE_DIR, exist_ok=True) | |
# Download and cache the model | |
try: | |
model_path = snapshot_download(MODEL_NAME, cache_dir=MODEL_CACHE_DIR) | |
except Exception as e: | |
print(f"Error downloading model: {str(e)}") | |
model_path = os.path.join(MODEL_CACHE_DIR, MODEL_NAME) | |
# Model configuration and existing functions remain unchanged... | |
def load_model_and_tokenizer(): | |
"""Load a cached instance of the model and tokenizer""" | |
print("Loading/Retrieving mPLUG model from cache...") | |
try: | |
# Clear GPU memory if using CUDA | |
if DEVICE == "cuda": | |
torch.cuda.empty_cache() | |
gc.collect() | |
model = AutoModel.from_pretrained( | |
model_path, | |
attn_implementation='sdpa', | |
trust_remote_code=True, | |
torch_dtype=torch.half, | |
device_map='auto' | |
) | |
tokenizer = AutoTokenizer.from_pretrained( | |
model_path, | |
trust_remote_code=True | |
) | |
model.eval() | |
processor = model.init_processor(tokenizer) | |
return model, tokenizer, processor | |
except Exception as e: | |
print(f"Error loading model: {str(e)}") | |
raise | |
def process_image(image_path, model, tokenizer, processor, prompt): | |
"""Process single image with mPLUG model""" | |
try: | |
image = Image.open(image_path) | |
messages = [{ | |
"role": "user", | |
"content": prompt, | |
"images": [image] | |
}] | |
model_messages = [] | |
images = [] | |
for msg in messages: | |
content_str = msg["content"] | |
if "images" in msg and msg["images"]: | |
content_str += "<|image|>" | |
images.extend(msg["images"]) | |
model_messages.append({ | |
"role": msg["role"], | |
"content": content_str | |
}) | |
model_messages.append({ | |
"role": "assistant", | |
"content": "" | |
}) | |
inputs = processor( | |
model_messages, | |
images=images, | |
videos=None | |
) | |
inputs.to('cuda') | |
inputs.update({ | |
'tokenizer': tokenizer, | |
'max_new_tokens': 100, | |
'decode_text': True, | |
}) | |
response = model.generate(**inputs) | |
return response[0] | |
except Exception as e: | |
print(f"Error processing image: {str(e)}") | |
return "Error processing image" | |
def process_video_chunk(video_frames, model, tokenizer, processor, prompt): | |
"""Process a chunk of video frames with mPLUG model""" | |
messages = [ | |
{ | |
"role": "user", | |
"content": prompt, | |
"video_frames": video_frames | |
} | |
] | |
model_messages = [] | |
videos = [] | |
for msg in messages: | |
content_str = msg["content"] | |
if "video_frames" in msg and msg["video_frames"]: | |
content_str += "<|video|>" | |
videos.append(msg["video_frames"]) | |
model_messages.append({ | |
"role": msg["role"], | |
"content": content_str | |
}) | |
model_messages.append({ | |
"role": "assistant", | |
"content": "" | |
}) | |
inputs = processor( | |
model_messages, | |
images=None, | |
videos=videos if videos else None | |
) | |
inputs.to('cuda') | |
inputs.update({ | |
'tokenizer': tokenizer, | |
'max_new_tokens': 100, | |
'decode_text': True, | |
}) | |
response = model.generate(**inputs) | |
del inputs | |
return response[0] | |
def split_original_video(video_path, chunk_info): | |
"""Split original video into chunks using multiple methods with fallbacks for cross-platform reliability""" | |
original_chunks = [] | |
# Clean the ./tmp directory containing chunks/thumbnails | |
tmp_dir = os.path.join('.', 'tmp') | |
if os.path.exists(tmp_dir): | |
try: | |
shutil.rmtree(tmp_dir) | |
os.makedirs(tmp_dir, exist_ok=True) # Recreate for next run | |
print(f"Cleaned up temporary directory: {tmp_dir}") | |
except OSError as e: | |
print(f"Error removing temporary directory {tmp_dir}: {e}") | |
else: | |
os.makedirs(tmp_dir) | |
for chunk in chunk_info: | |
chunk_id = chunk['chunk_id'] | |
start_time = chunk['start_time'] | |
end_time = chunk['end_time'] | |
output_path = os.path.join(tmp_dir, f"original_chunk_{chunk_id}.mp4") | |
# Try three different methods in order of preference | |
chunk_created = False | |
# Method 1: Try ffmpeg-python library | |
if not chunk_created: | |
try: | |
( | |
ffmpeg | |
.input(video_path, ss=start_time, to=end_time) | |
.output(output_path, c='copy', loglevel="quiet") # Added loglevel quiet | |
.run(capture_stdout=True, capture_stderr=True) | |
) | |
# Check if file exists and is not empty after ffmpeg-python call | |
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
chunk_created = True | |
print(f"Successfully created chunk {chunk_id} using ffmpeg-python") | |
else: | |
print(f"ffmpeg-python ran but did not create a valid file for chunk {chunk_id}") | |
# Optionally raise an exception here if needed, or just let it proceed to next method | |
except ffmpeg.Error as e: # Catch specific ffmpeg errors | |
print(f"ffmpeg-python error for chunk {chunk_id}: {e.stderr.decode() if e.stderr else str(e)}, trying OpenCV method") | |
except Exception as e: # Catch other potential errors like file not found | |
print(f"ffmpeg-python failed with general error for chunk {chunk_id}: {str(e)}, trying OpenCV method") | |
# Method 2: Try OpenCV for video splitting (re-encoding) | |
if not chunk_created: | |
try: | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise IOError(f"Cannot open video file: {video_path}") | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
if fps <= 0: # Handle case where fps is invalid | |
print(f"Warning: Invalid FPS ({fps}) detected for {video_path}. Using default 30.") | |
fps = 30.0 | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
# Calculate frame positions | |
start_frame = int(start_time * fps) | |
end_frame = int(end_time * fps) | |
# Set position to start frame | |
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
current_frame = start_frame | |
while current_frame < end_frame: | |
ret, frame = cap.read() | |
if not ret: | |
print(f"Warning: Could not read frame {current_frame} for chunk {chunk_id}. Reached end of video early?") | |
break # Stop if we can't read a frame | |
out.write(frame) | |
current_frame += 1 | |
cap.release() | |
out.release() | |
# Check if file exists and is not empty after OpenCV call | |
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
chunk_created = True | |
print(f"Successfully created chunk {chunk_id} using OpenCV") | |
else: | |
print(f"OpenCV method ran but did not create a valid file for chunk {chunk_id}") | |
except Exception as e: | |
print(f"OpenCV method failed for chunk {chunk_id}: {str(e)}, trying subprocess method") | |
# Clean up potentially empty file created by OpenCV on error | |
if os.path.exists(output_path): | |
try: | |
os.remove(output_path) | |
except OSError: | |
pass # Ignore cleanup error | |
# Method 3: Last resort - Try subprocess with better error handling | |
if not chunk_created: | |
try: | |
cmd = [ | |
'ffmpeg', | |
'-ss', str(start_time), | |
'-to', str(end_time), | |
'-i', video_path, | |
'-c', 'copy', # Attempt copy first | |
'-loglevel', 'error', # Reduce log noise | |
output_path | |
] | |
process = subprocess.run(cmd, capture_output=True, text=True, check=False) # Don't check=True initially | |
if process.returncode != 0 or not os.path.exists(output_path) or os.path.getsize(output_path) == 0: | |
print(f"Subprocess ffmpeg copy failed for chunk {chunk_id}. Stderr: {process.stderr}. Trying re-encoding.") | |
# If copy fails, try re-encoding as a fallback within subprocess | |
cmd_reencode = [ | |
'ffmpeg', | |
'-ss', str(start_time), | |
'-to', str(end_time), | |
'-i', video_path, | |
# '-c:v', 'libx264', # Example re-encode, adjust as needed | |
# '-crf', '23', | |
# '-c:a', 'aac', | |
'-loglevel', 'error', | |
output_path | |
] | |
# Ensure overwrite if previous attempt created an empty file | |
if os.path.exists(output_path): | |
cmd_reencode.insert(1, '-y') # Add overwrite flag | |
process_reencode = subprocess.run(cmd_reencode, capture_output=True, text=True, check=False) | |
if process_reencode.returncode != 0: | |
raise Exception(f"Subprocess ffmpeg re-encode also failed. Stderr: {process_reencode.stderr}") | |
# Final check after subprocess attempts | |
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
chunk_created = True | |
print(f"Successfully created chunk {chunk_id} using subprocess ffmpeg") | |
else: | |
raise Exception("Subprocess ffmpeg failed to create a valid file.") | |
except FileNotFoundError: | |
print(f"Subprocess failed for chunk {chunk_id}: 'ffmpeg' command not found. Ensure ffmpeg is installed and in PATH.") | |
except Exception as e: | |
print(f"Subprocess method failed for chunk {chunk_id}: {str(e)}") | |
# Clean up potentially empty file | |
if os.path.exists(output_path): | |
try: | |
os.remove(output_path) | |
except OSError: | |
pass | |
# If any method succeeded, add the chunk to our list | |
if chunk_created and os.path.exists(output_path): | |
original_chunks.append(output_path) | |
else: | |
print(f"Warning: Failed to create chunk {chunk_id} using all methods, skipping.") | |
return original_chunks | |
def encode_video_in_chunks(video_path): | |
"""Extract frames from a video in chunks and save chunks to disk""" | |
global TOTAL_CHUNKS | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
original_fps = vr.get_avg_fps() | |
sample_fps = round(original_fps / 1) # 1 FPS | |
frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
fps = vr.get_avg_fps() | |
# Create tmp directory if it doesn't exist | |
tmp_dir = os.path.join('.', 'tmp') | |
os.makedirs(tmp_dir, exist_ok=True) | |
# Split frame indices into chunks | |
chunks = [ | |
frame_idx[i:i + MAX_NUM_FRAMES] | |
for i in range(0, len(frame_idx), MAX_NUM_FRAMES) | |
] | |
# Set global TOTAL_CHUNKS before processing | |
TOTAL_CHUNKS = len(chunks) | |
print(f"Total chunks: {TOTAL_CHUNKS}") | |
# Information about saved chunks | |
chunk_info = [] | |
for chunk_idx, chunk in enumerate(chunks): | |
# Get frames for this chunk | |
frames = vr.get_batch(chunk).asnumpy() | |
frames_pil = [Image.fromarray(v.astype('uint8')) for v in frames] | |
# Save chunk as a video file | |
chunk_path = os.path.join(tmp_dir, f"chunk_{chunk_idx}.mp4") | |
# Calculate start and end times for this chunk | |
if chunk: | |
start_frame = chunk[0] | |
end_frame = chunk[-1] | |
start_time = start_frame / fps | |
end_time = end_frame / fps | |
# Save chunk info for later use | |
chunk_info.append({ | |
'chunk_id': chunk_idx, | |
'path': chunk_path, | |
'start_time': start_time, | |
'end_time': end_time, | |
'start_frame': start_frame, | |
'end_frame': end_frame, | |
'original_fps': fps # Use actual fps from video | |
}) | |
# Use OpenCV to create video from frames | |
height, width, _ = frames[0].shape | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(chunk_path, fourcc, fps, (width, height)) | |
for frame in frames: | |
# Convert RGB to BGR (OpenCV format) | |
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
out.write(frame_bgr) | |
out.release() | |
print(f"Saved chunk {chunk_idx} to {chunk_path}") | |
yield chunk_idx, frames_pil, chunk_info[-1] if chunk_info else None | |
# Split original video after processing all chunks | |
original_chunks = split_original_video(video_path, chunk_info) | |
def analyze_image_activities(image_path): | |
"""Analyze construction site image and generate activity description""" | |
from datetime import datetime, timedelta | |
try: | |
# Sample structured response - Replace with actual model processing | |
return [ | |
{ | |
'time': datetime.now().strftime("%I:%M %p"), | |
'summary': 'Excavation work in progress', | |
'objects': ['excavator', 'worker', 'dump-truck'] | |
}, | |
{ | |
'time': (datetime.now() - timedelta(minutes=30)).strftime("%I:%M %p"), | |
'summary': 'Material loading operation', | |
'objects': ['loader', 'worker', 'gravel'] | |
} | |
] | |
except Exception as e: | |
print(f"Error analyzing image: {str(e)}") | |
return [] # Return empty list on error | |
def generate_thumbnails(video_path, num_chunks): | |
"""Extract thumbnails for each chunk | |
Args: | |
video_path: Path to video file | |
num_chunks: Number of chunks to generate thumbnails for | |
""" | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
thumbnails = [] | |
total_frames = len(vr) | |
# Create/clear tmp directory in current working directory | |
tmp_dir = os.path.join('.', 'tmp') | |
# Remove existing directory if it exists | |
if os.path.exists(tmp_dir): | |
shutil.rmtree(tmp_dir) | |
os.makedirs(tmp_dir, exist_ok=True) | |
# Calculate frame step size based on number of chunks | |
frame_step = total_frames // num_chunks | |
for chunk_idx in range(num_chunks): | |
# Take frame at start of each chunk | |
frame_idx = chunk_idx * frame_step | |
if frame_idx < total_frames: | |
frame = vr[frame_idx].asnumpy() | |
img = Image.fromarray(frame) | |
temp_path = os.path.join(tmp_dir, f"thumbnail_{chunk_idx}.jpg") | |
img.save(temp_path) | |
thumbnails.append({ | |
"path": temp_path, | |
"time": frame_idx/vr.get_avg_fps() | |
}) | |
return thumbnails | |
def analyze_video_activities(video_path, model, tokenizer, processor): | |
"""Analyze video using mPLUG model with chunking""" | |
global TOTAL_CHUNKS | |
# try: | |
# Existing chunk processing | |
all_activities = [] | |
# Calculate total chunks first | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
sample_fps = round(vr.get_avg_fps() / 1) | |
frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
TOTAL_CHUNKS = len([frame_idx[i:i + MAX_NUM_FRAMES] | |
for i in range(0, len(frame_idx), MAX_NUM_FRAMES)]) | |
# Generate thumbnails with known chunk count | |
thumbnails = generate_thumbnails(video_path, num_chunks=TOTAL_CHUNKS) | |
# Now process chunks | |
chunk_generator = encode_video_in_chunks(video_path) | |
for chunk_idx, video_frames, chunk_info in chunk_generator: | |
prompt = "Analyze this construction site video chunk and describe the activities happening. Focus on construction activities, machinery usage, and worker actions. Include any construction equipment or machinery you can identify." | |
response = process_video_chunk(video_frames, model, tokenizer, processor, prompt) | |
print(f"Chunk {chunk_idx}: {response}") | |
# Map responses to thumbnails | |
time_start = chunk_idx * MAX_NUM_FRAMES | |
chunk_thumbnails = [t for t in thumbnails | |
if time_start <= t['time'] < time_start + MAX_NUM_FRAMES] | |
# Extract time from frame position | |
for thumbnail in chunk_thumbnails: | |
# Calculate timestamp in minutes:seconds format | |
seconds = int(thumbnail['time']) | |
minutes = seconds // 60 | |
seconds = seconds % 60 | |
timestamp = f"{minutes:02d}:{seconds:02d}" | |
# Extract objects using basic text parsing from the response | |
# In a production system, you might want to use more sophisticated NLP | |
objects = [] | |
lower_response = response.lower() | |
possible_objects = ["excavator", "bulldozer", "crane", "truck", "loader", | |
"worker", "concrete", "scaffold", "beam", "pipe", | |
"rebar", "formwork", "drill", "grader", "roller"] | |
for obj in possible_objects: | |
if obj in lower_response: | |
objects.append(obj) | |
activity = { | |
'time': timestamp, | |
'timestamp_seconds': thumbnail['time'], # Store raw seconds for sorting | |
'summary': response, | |
'objects': objects, | |
'thumbnail': thumbnail["path"], | |
'chunk_id': chunk_idx, | |
'chunk_path': chunk_info['path'] if chunk_info else None | |
} | |
all_activities.append(activity) | |
# Sort activities by timestamp | |
all_activities.sort(key=lambda x: x['timestamp_seconds']) | |
return all_activities | |
# except Exception as e: | |
# print(f"Error analyzing video: {str(e)}") | |
# return [] # Maintain consistent return type | |