Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import cv2 | |
| import streamlit as st | |
| import PIL | |
| import requests | |
| from ultralytics import YOLO | |
| import time | |
| import numpy as np | |
| import imageio_ffmpeg as ffmpeg | |
| import base64 | |
| # Page config first | |
| st.set_page_config( | |
| page_title="Fire Watch: Fire and Smoke Detection with an AI Vision Model", | |
| page_icon="🔥", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Model path | |
| model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt' | |
| # Session state initialization | |
| for key in ["processed_frames", "slider_value", "processed_video", "start_time"]: | |
| if key not in st.session_state: | |
| st.session_state[key] = [] if key == "processed_frames" else 0 if key == "slider_value" else None | |
| # Sidebar | |
| with st.sidebar: | |
| st.header("Upload & Settings") | |
| source_file = st.file_uploader("Upload image or video to be analyzed:", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"]) | |
| confidence = float(st.slider("Confidence Threshold", 10, 100, 20)) / 100 | |
| fps_options = { | |
| "Original FPS": None, | |
| "3 FPS": 3, | |
| "1 FPS": 1, | |
| "1 frame/4s": 0.25, | |
| "1 frame/10s": 0.1, | |
| "1 frame/15s": 0.0667, | |
| "1 frame/30s": 0.0333 | |
| } | |
| video_option = st.selectbox("Output Frame Rate", list(fps_options.keys())) | |
| process_button = st.button("Detect fire") | |
| progress_bar = st.progress(0) | |
| progress_text = st.empty() | |
| download_slot = st.empty() | |
| # Main page | |
| st.title("Fire Watch: AI-Powered Fire and Smoke Detection") | |
| # Display result images directly | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| fire_4a_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/Fire_4a.jpg" | |
| st.image(fire_4a_url, use_column_width=True) | |
| with col2: | |
| fire_3a_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/Fire_3a.jpg" | |
| st.image(fire_3a_url, use_column_width=True) | |
| st.markdown(""" | |
| Early wildfire detection using YOLOv8 AI vision model. See detected results above and video examples below, or upload your own content! | |
| Click on video frames to load and play examples. | |
| """) | |
| # Function to create simple video pair HTML | |
| if not source_file: | |
| st.info("Please upload a file to begin.") | |
| st.header("Your Results") | |
| result_cols = st.columns(2) | |
| viewer_slot = st.empty() | |
| # Example videos (LA before T) | |
| #st.header("Example Results") | |
| #examples = [ | |
| # ("LA Example", "LA1.mp4", "LA2.mp4"), | |
| # ("T Example", "T1.mp4", "T2.mp4") | |
| #] | |
| #for title, orig_file, proc_file in examples: | |
| # st.subheader(title) | |
| # orig_url = f"https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/{orig_file}" | |
| # proc_url = f"https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/{proc_file}" | |
| # video_html = create_video_pair(orig_url, proc_url) | |
| # st.markdown(video_html, unsafe_allow_html=True) | |
| # Load model | |
| try: | |
| model = YOLO(model_path) | |
| except Exception as ex: | |
| st.error(f"Model loading failed: {str(ex)}") | |
| model = None | |
| # Processing | |
| if process_button and source_file and model: | |
| st.session_state.processed_frames = [] | |
| if source_file.type.split('/')[0] == 'image': | |
| image = PIL.Image.open(source_file) | |
| res = model.predict(image, conf=confidence) | |
| result = res[0].plot()[:, :, ::-1] | |
| with result_cols[0]: | |
| st.image(image, caption="Original", use_column_width=True) | |
| with result_cols[1]: | |
| st.image(result, caption="Detected", use_column_width=True) | |
| else: | |
| # Video processing | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: | |
| tmp.write(source_file.read()) | |
| vidcap = cv2.VideoCapture(tmp.name) | |
| orig_fps = vidcap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| output_fps = fps_options[video_option] if fps_options[video_option] else orig_fps | |
| sample_interval = max(1, int(orig_fps / output_fps)) if output_fps else 1 | |
| # Set fixed output FPS to 2 (500ms per frame = 2 FPS) | |
| fixed_output_fps = 2 | |
| st.session_state.start_time = time.time() | |
| frame_count = 0 | |
| processed_count = 0 | |
| success, frame = vidcap.read() | |
| while success: | |
| if frame_count % sample_interval == 0: | |
| res = model.predict(frame, conf=confidence) | |
| processed_frame = res[0].plot()[:, :, ::-1] | |
| if not processed_frame.flags['C_CONTIGUOUS']: | |
| processed_frame = np.ascontiguousarray(processed_frame) | |
| st.session_state.processed_frames.append(processed_frame) | |
| processed_count += 1 | |
| elapsed = time.time() - st.session_state.start_time | |
| progress = frame_count / total_frames | |
| if elapsed > 0 and progress > 0: | |
| total_estimated_time = elapsed / progress | |
| eta = total_estimated_time - elapsed | |
| elapsed_str = f"{int(elapsed // 60)}m {int(elapsed % 60)}s" | |
| eta_str = f"{int(eta // 60)}m {int(eta % 60)}s" if eta > 0 else "Almost done" | |
| else: | |
| elapsed_str = "0s" | |
| eta_str = "Calculating..." | |
| progress_bar.progress(min(progress, 1.0)) | |
| progress_text.text(f"Progress: {progress:.1%}\nElapsed: {elapsed_str}\nETA: {eta_str}") | |
| frame_count += 1 | |
| success, frame = vidcap.read() | |
| vidcap.release() | |
| os.unlink(tmp.name) | |
| if st.session_state.processed_frames: | |
| out_path = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False).name | |
| writer = ffmpeg.write_frames( | |
| out_path, | |
| (width, height), | |
| fps=fixed_output_fps, # Fixed at 2 FPS (500ms per frame) | |
| codec='libx264', | |
| pix_fmt_in='bgr24', | |
| pix_fmt_out='yuv420p' | |
| ) | |
| writer.send(None) # Initialize writer | |
| for frame in st.session_state.processed_frames: | |
| writer.send(frame) | |
| writer.close() | |
| with open(out_path, 'rb') as f: | |
| st.session_state.processed_video = f.read() | |
| os.unlink(out_path) | |
| elapsed_final = time.time() - st.session_state.start_time | |
| elapsed_final_str = f"{int(elapsed_final // 60)}m {int(elapsed_final % 60)}s" | |
| progress_bar.progress(1.0) | |
| progress_text.text(f"Progress: 100%\nElapsed: {elapsed_final_str}\nETA: 0m 0s") | |
| with result_cols[0]: | |
| st.video(source_file) | |
| with result_cols[1]: | |
| st.video(st.session_state.processed_video) | |
| download_slot.download_button( | |
| label="Download Processed Video", | |
| data=st.session_state.processed_video, | |
| file_name="results_fire_analysis.mp4", | |
| mime="video/mp4" | |
| ) | |
| if not source_file: | |
| st.info("Please upload a file to begin.") |