Spaces:
Sleeping
Sleeping
File size: 6,708 Bytes
5779747 c8ba898 19de063 82d8aa2 e50e82a 5779747 82d8aa2 5779747 e50e82a 19de063 5779747 82d8aa2 5779747 82d8aa2 5779747 8e725cb 5779747 82d8aa2 5779747 82d8aa2 5779747 8e725cb 5779747 8e725cb 5779747 82d8aa2 8e725cb c8ba898 8e725cb c8ba898 82d8aa2 c8ba898 e50e82a 82d8aa2 5779747 8e725cb 5779747 19de063 5779747 8e725cb 5779747 82d8aa2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import streamlit as st
import pandas as pd
from datetime import datetime
import os
import threading
import time
from PIL import Image
from io import BytesIO
import base64
import cv2
import queue
# ππ₯ Initialize session state like a galactic DJ spinning tracks!
if 'file_history' not in st.session_state:
st.session_state['file_history'] = []
if 'auto_capture_running' not in st.session_state:
st.session_state['auto_capture_running'] = False
# πΈπ· Camera thread class for OpenCV capture
class CamThread(threading.Thread):
def __init__(self, preview_name, cam_id, frame_queue):
threading.Thread.__init__(self)
self.preview_name = preview_name
self.cam_id = cam_id
self.frame_queue = frame_queue
self.running = True
def run(self):
print(f"Starting {self.preview_name}")
self.cam_preview()
def cam_preview(self):
cam = cv2.VideoCapture(self.cam_id)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # Set reasonable resolution
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # Use MJPG for better USB compatibility
if not cam.isOpened():
st.error(f"π¨ Failed to open {self.preview_name}")
return
while self.running:
ret, frame = cam.read()
if ret:
# Convert BGR to RGB for display
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
try:
self.frame_queue.put_nowait(frame_rgb)
except queue.Full:
pass # Skip if queue is full
time.sleep(0.03) # ~30 FPS
cam.release()
def stop(self):
self.running = False
# ππΎ Save to history like a time-traveling scribe! | π
β¨ save_to_history("πΌοΈ Image", "pic.jpg", img_data) - Stamps a pic in the history books like a boss!
def save_to_history(file_type, file_path, img_data):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Convert RGB to BGR for OpenCV saving
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
cv2.imwrite(file_path, img_bgr)
st.session_state['file_history'].append({
"Timestamp": timestamp,
"Type": file_type,
"Path": file_path
})
# πΈβ° Auto-capture every 10 secs like a sneaky shutterbug!
def auto_capture(frame_queues):
while st.session_state['auto_capture_running']:
for i, q in enumerate(frame_queues):
try:
frame = q.get_nowait()
filename = f"cam{i}_auto_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
save_to_history("πΌοΈ Image", filename, frame)
except queue.Empty:
pass # Skip if no frame available
time.sleep(10)
# ποΈ Sidebar config like a spaceship control panel!
with st.sidebar:
st.header("ποΈπΈ Snap Shack")
if st.button("β° Start Auto-Snap"):
st.session_state['auto_capture_running'] = True
threading.Thread(target=auto_capture, args=(frame_queues,), daemon=True).start()
if st.button("βΉοΈ Stop Auto-Snap"):
st.session_state['auto_capture_running'] = False
# π Sidebar file outline with emoji flair!
st.subheader("π Snap Stash")
if st.session_state['file_history']:
images = [f for f in st.session_state['file_history'] if f['Type'] == "πΌοΈ Image"]
if images:
st.write("πΌοΈ Images")
for img in images:
st.write(f"- {img['Path']} @ {img['Timestamp']}")
else:
st.write("π³οΈ Empty Stash!")
# ππ¨ Main UI kicks off like a cosmic art show!
st.title("πΈ OpenCV Snap Craze")
# πΈπ· Start camera threads
frame_queues = [queue.Queue(maxsize=1), queue.Queue(maxsize=1)]
threads = [
CamThread("Camera 0", 0, frame_queues[0]),
CamThread("Camera 1", 1, frame_queues[1])
]
for thread in threads:
thread.start()
# πΈπ· Camera snap zone!
st.header("πΈπ₯ Snap Zone")
cols = st.columns(2)
placeholders = [cols[0].empty(), cols[1].empty()]
for i, (placeholder, q, thread) in enumerate(zip(placeholders, frame_queues, threads)):
if st.button(f"πΈ Snap Cam {i}", key=f"snap{i}"):
try:
frame = q.get_nowait()
filename = f"cam{i}_snap_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
save_to_history("πΌοΈ Image", filename, frame)
except queue.Empty:
st.error(f"π¨ No frame from Cam {i} yet!")
try:
frame = q.get_nowait()
placeholder.image(frame, caption=f"Live Cam {i}", use_container_width=True)
except queue.Empty:
placeholder.write(f"π· Waiting for Cam {i}...")
# π Upload zone like a media drop party!
st.header("π₯π Drop Zone")
uploaded_files = st.file_uploader("πΈ Toss Pics", accept_multiple_files=True, type=['jpg', 'png'])
if uploaded_files:
for uploaded_file in uploaded_files:
file_path = f"uploaded_{uploaded_file.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
img = Image.open(uploaded_file)
img_array = np.array(img)
save_to_history("πΌοΈ Image", file_path, img_array)
st.image(img, caption=uploaded_file.name, use_container_width=True)
# πΌοΈ Gallery like a media circus!
st.header("πͺ Snap Show")
if st.session_state['file_history']:
images = [f for f in st.session_state['file_history'] if f['Type'] == "πΌοΈ Image"]
if images:
st.subheader("πΌοΈ Pic Parade")
cols = st.columns(3)
for i, img in enumerate(images):
with cols[i % 3]:
if os.path.exists(img['Path']):
st.image(img['Path'], caption=img['Path'], use_container_width=True)
with open(img['Path'], "rb") as f:
img_data = f.read()
st.markdown(f'<a href="data:image/jpeg;base64,{base64.b64encode(img_data).decode()}" download="{os.path.basename(img["Path"])}">π₯ Snag It!</a>', unsafe_allow_html=True)
else:
st.warning(f"π¨ Missing file: {img['Path']}")
else:
st.write("π« No snaps yet!")
# π History log like a time machine!
st.header("β³ Snap Saga")
if st.session_state['file_history']:
df = pd.DataFrame(st.session_state['file_history'])
st.dataframe(df)
else:
st.write("π³οΈ Nothing snapped yet!")
# Cleanup on app exit (not typically needed in Streamlit, but good practice)
def cleanup():
for thread in threads:
thread.stop()
for thread in threads:
thread.join()
import atexit
atexit.register(cleanup) |