Spaces:
Runtime error
Runtime error
import torch | |
from transformers import AutoModel, AutoTokenizer | |
from modelscope.hub.snapshot_download import snapshot_download | |
from PIL import Image | |
from decord import VideoReader, cpu | |
import os | |
import gc | |
import cv2 | |
import tempfile | |
import shutil | |
import subprocess | |
from yolo_detection import is_image, is_video | |
# Constants for video processing | |
MAX_NUM_FRAMES = 32 | |
# Check if CUDA is available | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
global TOTAL_CHUNKS | |
TOTAL_CHUNKS = 1 | |
# Initialize GPU if available | |
if DEVICE == "cuda": | |
def debug(): | |
torch.randn(10).cuda() | |
debug() | |
# Model configuration | |
MODEL_NAME = 'iic/mPLUG-Owl3-7B-240728' | |
MODEL_CACHE_DIR = "/data/models" | |
os.makedirs(MODEL_CACHE_DIR, exist_ok=True) | |
# Download and cache the model | |
try: | |
model_path = snapshot_download(MODEL_NAME, cache_dir=MODEL_CACHE_DIR) | |
except Exception as e: | |
print(f"Error downloading model: {str(e)}") | |
model_path = os.path.join(MODEL_CACHE_DIR, MODEL_NAME) | |
# Model configuration and existing functions remain unchanged... | |
def load_model_and_tokenizer(): | |
"""Load a fresh instance of the model and tokenizer""" | |
try: | |
# Clear GPU memory if using CUDA | |
if DEVICE == "cuda": | |
torch.cuda.empty_cache() | |
gc.collect() | |
model = AutoModel.from_pretrained( | |
model_path, | |
attn_implementation='sdpa', | |
trust_remote_code=True, | |
torch_dtype=torch.half, | |
device_map='auto' | |
) | |
tokenizer = AutoTokenizer.from_pretrained( | |
model_path, | |
trust_remote_code=True | |
) | |
model.eval() | |
processor = model.init_processor(tokenizer) | |
return model, tokenizer, processor | |
except Exception as e: | |
print(f"Error loading model: {str(e)}") | |
raise | |
def process_image(image_path, model, tokenizer, processor, prompt): | |
"""Process single image with mPLUG model""" | |
try: | |
image = Image.open(image_path) | |
messages = [{ | |
"role": "user", | |
"content": prompt, | |
"images": [image] | |
}] | |
model_messages = [] | |
images = [] | |
for msg in messages: | |
content_str = msg["content"] | |
if "images" in msg and msg["images"]: | |
content_str += "<|image|>" | |
images.extend(msg["images"]) | |
model_messages.append({ | |
"role": msg["role"], | |
"content": content_str | |
}) | |
model_messages.append({ | |
"role": "assistant", | |
"content": "" | |
}) | |
inputs = processor( | |
model_messages, | |
images=images, | |
videos=None | |
) | |
inputs.to('cuda') | |
inputs.update({ | |
'tokenizer': tokenizer, | |
'max_new_tokens': 100, | |
'decode_text': True, | |
}) | |
response = model.generate(**inputs) | |
return response[0] | |
except Exception as e: | |
print(f"Error processing image: {str(e)}") | |
return "Error processing image" | |
def process_video_chunk(video_frames, model, tokenizer, processor, prompt): | |
"""Process a chunk of video frames with mPLUG model""" | |
messages = [ | |
{ | |
"role": "user", | |
"content": prompt, | |
"video_frames": video_frames | |
} | |
] | |
model_messages = [] | |
videos = [] | |
for msg in messages: | |
content_str = msg["content"] | |
if "video_frames" in msg and msg["video_frames"]: | |
content_str += "<|video|>" | |
videos.append(msg["video_frames"]) | |
model_messages.append({ | |
"role": msg["role"], | |
"content": content_str | |
}) | |
model_messages.append({ | |
"role": "assistant", | |
"content": "" | |
}) | |
inputs = processor( | |
model_messages, | |
images=None, | |
videos=videos if videos else None | |
) | |
inputs.to('cuda') | |
inputs.update({ | |
'tokenizer': tokenizer, | |
'max_new_tokens': 100, | |
'decode_text': True, | |
}) | |
response = model.generate(**inputs) | |
return response[0] | |
def split_original_video(video_path, chunk_info): | |
"""Split original video into chunks using precise timestamps""" | |
original_chunks = [] | |
tmp_dir = os.path.join('.', 'tmp') | |
for chunk in chunk_info: | |
output_path = os.path.join(tmp_dir, f"original_chunk_{chunk['chunk_id']}.mp4") | |
# Use ffmpeg for precise splitting without re-encoding | |
cmd = [ | |
'ffmpeg', | |
'-ss', str(chunk['start_time']), | |
'-to', str(chunk['end_time']), | |
'-i', video_path, | |
'-c', 'copy', | |
output_path | |
] | |
subprocess.run(cmd, check=True) | |
original_chunks.append(output_path) | |
return original_chunks | |
def encode_video_in_chunks(video_path): | |
"""Extract frames from a video in chunks and save chunks to disk""" | |
global TOTAL_CHUNKS | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
original_fps = vr.get_avg_fps() | |
sample_fps = round(original_fps / 1) # 1 FPS | |
frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
fps = vr.get_avg_fps() | |
# Create tmp directory if it doesn't exist | |
tmp_dir = os.path.join('.', 'tmp') | |
os.makedirs(tmp_dir, exist_ok=True) | |
# Split frame indices into chunks | |
chunks = [ | |
frame_idx[i:i + MAX_NUM_FRAMES] | |
for i in range(0, len(frame_idx), MAX_NUM_FRAMES) | |
] | |
# Set global TOTAL_CHUNKS before processing | |
TOTAL_CHUNKS = len(chunks) | |
print(f"Total chunks: {TOTAL_CHUNKS}") | |
# Information about saved chunks | |
chunk_info = [] | |
for chunk_idx, chunk in enumerate(chunks): | |
# Get frames for this chunk | |
frames = vr.get_batch(chunk).asnumpy() | |
frames_pil = [Image.fromarray(v.astype('uint8')) for v in frames] | |
# Save chunk as a video file | |
chunk_path = os.path.join(tmp_dir, f"chunk_{chunk_idx}.mp4") | |
# Calculate start and end times for this chunk | |
if chunk: | |
start_frame = chunk[0] | |
end_frame = chunk[-1] | |
start_time = start_frame / fps | |
end_time = end_frame / fps | |
# Save chunk info for later use | |
chunk_info.append({ | |
'chunk_id': chunk_idx, | |
'path': chunk_path, | |
'start_time': start_time, | |
'end_time': end_time, | |
'start_frame': start_frame, | |
'end_frame': end_frame, | |
'original_fps': fps # Use actual fps from video | |
}) | |
# Use OpenCV to create video from frames | |
height, width, _ = frames[0].shape | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(chunk_path, fourcc, fps, (width, height)) | |
for frame in frames: | |
# Convert RGB to BGR (OpenCV format) | |
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
out.write(frame_bgr) | |
out.release() | |
print(f"Saved chunk {chunk_idx} to {chunk_path}") | |
yield chunk_idx, frames_pil, chunk_info[-1] if chunk_info else None | |
# Split original video after processing all chunks | |
original_chunks = split_original_video(video_path, chunk_info) | |
def analyze_image_activities(image_path): | |
"""Analyze construction site image and generate activity description""" | |
from datetime import datetime, timedelta | |
try: | |
# Sample structured response - Replace with actual model processing | |
return [ | |
{ | |
'time': datetime.now().strftime("%I:%M %p"), | |
'summary': 'Excavation work in progress', | |
'objects': ['excavator', 'worker', 'dump-truck'] | |
}, | |
{ | |
'time': (datetime.now() - timedelta(minutes=30)).strftime("%I:%M %p"), | |
'summary': 'Material loading operation', | |
'objects': ['loader', 'worker', 'gravel'] | |
} | |
] | |
except Exception as e: | |
print(f"Error analyzing image: {str(e)}") | |
return [] # Return empty list on error | |
def generate_thumbnails(video_path, num_chunks): | |
"""Extract thumbnails for each chunk | |
Args: | |
video_path: Path to video file | |
num_chunks: Number of chunks to generate thumbnails for | |
""" | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
thumbnails = [] | |
total_frames = len(vr) | |
# Create/clear tmp directory in current working directory | |
tmp_dir = os.path.join('.', 'tmp') | |
# Remove existing directory if it exists | |
if os.path.exists(tmp_dir): | |
shutil.rmtree(tmp_dir) | |
os.makedirs(tmp_dir, exist_ok=True) | |
# Calculate frame step size based on number of chunks | |
frame_step = total_frames // num_chunks | |
for chunk_idx in range(num_chunks): | |
# Take frame at start of each chunk | |
frame_idx = chunk_idx * frame_step | |
if frame_idx < total_frames: | |
frame = vr[frame_idx].asnumpy() | |
img = Image.fromarray(frame) | |
temp_path = os.path.join(tmp_dir, f"thumbnail_{chunk_idx}.jpg") | |
img.save(temp_path) | |
thumbnails.append({ | |
"path": temp_path, | |
"time": frame_idx/vr.get_avg_fps() | |
}) | |
return thumbnails | |
def analyze_video_activities(video_path): | |
"""Analyze video using mPLUG model with chunking""" | |
global TOTAL_CHUNKS | |
try: | |
# Existing chunk processing | |
all_activities = [] | |
# Calculate total chunks first | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
sample_fps = round(vr.get_avg_fps() / 1) | |
frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
TOTAL_CHUNKS = len([frame_idx[i:i + MAX_NUM_FRAMES] | |
for i in range(0, len(frame_idx), MAX_NUM_FRAMES)]) | |
# Generate thumbnails with known chunk count | |
thumbnails = generate_thumbnails(video_path, num_chunks=TOTAL_CHUNKS) | |
# Now process chunks | |
chunk_generator = encode_video_in_chunks(video_path) | |
model, tokenizer, processor = load_model_and_tokenizer() | |
for chunk_idx, video_frames, chunk_info in chunk_generator: | |
prompt = "Analyze this construction site video chunk and describe the activities happening. Focus on construction activities, machinery usage, and worker actions. Include any construction equipment or machinery you can identify." | |
response = process_video_chunk(video_frames, model, tokenizer, processor, prompt) | |
print(f"Chunk {chunk_idx}: {response}") | |
# Map responses to thumbnails | |
time_start = chunk_idx * MAX_NUM_FRAMES | |
chunk_thumbnails = [t for t in thumbnails | |
if time_start <= t['time'] < time_start + MAX_NUM_FRAMES] | |
# Extract time from frame position | |
for thumbnail in chunk_thumbnails: | |
# Calculate timestamp in minutes:seconds format | |
seconds = int(thumbnail['time']) | |
minutes = seconds // 60 | |
seconds = seconds % 60 | |
timestamp = f"{minutes:02d}:{seconds:02d}" | |
# Extract objects using basic text parsing from the response | |
# In a production system, you might want to use more sophisticated NLP | |
objects = [] | |
lower_response = response.lower() | |
possible_objects = ["excavator", "bulldozer", "crane", "truck", "loader", | |
"worker", "concrete", "scaffold", "beam", "pipe", | |
"rebar", "formwork", "drill", "grader", "roller"] | |
for obj in possible_objects: | |
if obj in lower_response: | |
objects.append(obj) | |
activity = { | |
'time': timestamp, | |
'timestamp_seconds': thumbnail['time'], # Store raw seconds for sorting | |
'summary': response, | |
'objects': objects, | |
'thumbnail': thumbnail["path"], | |
'chunk_id': chunk_idx, | |
'chunk_path': chunk_info['path'] if chunk_info else None | |
} | |
all_activities.append(activity) | |
# Sort activities by timestamp | |
all_activities.sort(key=lambda x: x['timestamp_seconds']) | |
return all_activities | |
except Exception as e: | |
print(f"Error analyzing video: {str(e)}") | |
return [] # Maintain consistent return type |