Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
from datetime import datetime | |
import os | |
import threading | |
import time | |
from PIL import Image | |
from io import BytesIO | |
import base64 | |
import cv2 | |
import queue | |
# ππ₯ Initialize session state like a galactic DJ spinning tracks! | |
if 'file_history' not in st.session_state: | |
st.session_state['file_history'] = [] | |
if 'auto_capture_running' not in st.session_state: | |
st.session_state['auto_capture_running'] = False | |
# πΈπ· Camera thread class for OpenCV capture | |
class CamThread(threading.Thread): | |
def __init__(self, preview_name, cam_id, frame_queue): | |
threading.Thread.__init__(self) | |
self.preview_name = preview_name | |
self.cam_id = cam_id | |
self.frame_queue = frame_queue | |
self.running = True | |
def run(self): | |
print(f"Starting {self.preview_name}") | |
self.cam_preview() | |
def cam_preview(self): | |
cam = cv2.VideoCapture(self.cam_id) | |
cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # Set reasonable resolution | |
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) | |
cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # Use MJPG for better USB compatibility | |
if not cam.isOpened(): | |
st.error(f"π¨ Failed to open {self.preview_name}") | |
return | |
while self.running: | |
ret, frame = cam.read() | |
if ret: | |
# Convert BGR to RGB for display | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
try: | |
self.frame_queue.put_nowait(frame_rgb) | |
except queue.Full: | |
pass # Skip if queue is full | |
time.sleep(0.03) # ~30 FPS | |
cam.release() | |
def stop(self): | |
self.running = False | |
# ππΎ Save to history like a time-traveling scribe! | π β¨ save_to_history("πΌοΈ Image", "pic.jpg", img_data) - Stamps a pic in the history books like a boss! | |
def save_to_history(file_type, file_path, img_data): | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
# Convert RGB to BGR for OpenCV saving | |
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR) | |
cv2.imwrite(file_path, img_bgr) | |
st.session_state['file_history'].append({ | |
"Timestamp": timestamp, | |
"Type": file_type, | |
"Path": file_path | |
}) | |
# πΈβ° Auto-capture every 10 secs like a sneaky shutterbug! | |
def auto_capture(frame_queues): | |
while st.session_state['auto_capture_running']: | |
for i, q in enumerate(frame_queues): | |
try: | |
frame = q.get_nowait() | |
filename = f"cam{i}_auto_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" | |
save_to_history("πΌοΈ Image", filename, frame) | |
except queue.Empty: | |
pass # Skip if no frame available | |
time.sleep(10) | |
# ποΈ Sidebar config like a spaceship control panel! | |
with st.sidebar: | |
st.header("ποΈπΈ Snap Shack") | |
if st.button("β° Start Auto-Snap"): | |
st.session_state['auto_capture_running'] = True | |
threading.Thread(target=auto_capture, args=(frame_queues,), daemon=True).start() | |
if st.button("βΉοΈ Stop Auto-Snap"): | |
st.session_state['auto_capture_running'] = False | |
# π Sidebar file outline with emoji flair! | |
st.subheader("π Snap Stash") | |
if st.session_state['file_history']: | |
images = [f for f in st.session_state['file_history'] if f['Type'] == "πΌοΈ Image"] | |
if images: | |
st.write("πΌοΈ Images") | |
for img in images: | |
st.write(f"- {img['Path']} @ {img['Timestamp']}") | |
else: | |
st.write("π³οΈ Empty Stash!") | |
# ππ¨ Main UI kicks off like a cosmic art show! | |
st.title("πΈ OpenCV Snap Craze") | |
# πΈπ· Start camera threads | |
frame_queues = [queue.Queue(maxsize=1), queue.Queue(maxsize=1)] | |
threads = [ | |
CamThread("Camera 0", 0, frame_queues[0]), | |
CamThread("Camera 1", 1, frame_queues[1]) | |
] | |
for thread in threads: | |
thread.start() | |
# πΈπ· Camera snap zone! | |
st.header("πΈπ₯ Snap Zone") | |
cols = st.columns(2) | |
placeholders = [cols[0].empty(), cols[1].empty()] | |
for i, (placeholder, q, thread) in enumerate(zip(placeholders, frame_queues, threads)): | |
if st.button(f"πΈ Snap Cam {i}", key=f"snap{i}"): | |
try: | |
frame = q.get_nowait() | |
filename = f"cam{i}_snap_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" | |
save_to_history("πΌοΈ Image", filename, frame) | |
except queue.Empty: | |
st.error(f"π¨ No frame from Cam {i} yet!") | |
try: | |
frame = q.get_nowait() | |
placeholder.image(frame, caption=f"Live Cam {i}", use_container_width=True) | |
except queue.Empty: | |
placeholder.write(f"π· Waiting for Cam {i}...") | |
# π Upload zone like a media drop party! | |
st.header("π₯π Drop Zone") | |
uploaded_files = st.file_uploader("πΈ Toss Pics", accept_multiple_files=True, type=['jpg', 'png']) | |
if uploaded_files: | |
for uploaded_file in uploaded_files: | |
file_path = f"uploaded_{uploaded_file.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" | |
img = Image.open(uploaded_file) | |
img_array = np.array(img) | |
save_to_history("πΌοΈ Image", file_path, img_array) | |
st.image(img, caption=uploaded_file.name, use_container_width=True) | |
# πΌοΈ Gallery like a media circus! | |
st.header("πͺ Snap Show") | |
if st.session_state['file_history']: | |
images = [f for f in st.session_state['file_history'] if f['Type'] == "πΌοΈ Image"] | |
if images: | |
st.subheader("πΌοΈ Pic Parade") | |
cols = st.columns(3) | |
for i, img in enumerate(images): | |
with cols[i % 3]: | |
if os.path.exists(img['Path']): | |
st.image(img['Path'], caption=img['Path'], use_container_width=True) | |
with open(img['Path'], "rb") as f: | |
img_data = f.read() | |
st.markdown(f'<a href="data:image/jpeg;base64,{base64.b64encode(img_data).decode()}" download="{os.path.basename(img["Path"])}">π₯ Snag It!</a>', unsafe_allow_html=True) | |
else: | |
st.warning(f"π¨ Missing file: {img['Path']}") | |
else: | |
st.write("π« No snaps yet!") | |
# π History log like a time machine! | |
st.header("β³ Snap Saga") | |
if st.session_state['file_history']: | |
df = pd.DataFrame(st.session_state['file_history']) | |
st.dataframe(df) | |
else: | |
st.write("π³οΈ Nothing snapped yet!") | |
# Cleanup on app exit (not typically needed in Streamlit, but good practice) | |
def cleanup(): | |
for thread in threads: | |
thread.stop() | |
for thread in threads: | |
thread.join() | |
import atexit | |
atexit.register(cleanup) |