Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	| import os | |
| from fastapi import Depends, FastAPI | |
| import sqlite3 | |
| import requests | |
| import uvicorn | |
| from pathlib import Path | |
| import json | |
| app = FastAPI() | |
| LIVEBLOCKS_SECRET = os.environ.get("LIVEBLOCKS_SECRET") | |
| DB_PATH = Path("rooms.db") | |
| if not DB_PATH.exists(): | |
| print("Creating database") | |
| print("DB_PATH", DB_PATH) | |
| db = sqlite3.connect(DB_PATH) | |
| with open(Path("schema.sql"), "r") as f: | |
| db.executescript(f.read()) | |
| db.commit() | |
| db.close() | |
| def get_db(): | |
| db = sqlite3.connect(Path("./rooms.db"), check_same_thread=False) | |
| print("Connected to database") | |
| db.row_factory = sqlite3.Row | |
| try: | |
| yield db | |
| except Exception: | |
| db.rollback() | |
| finally: | |
| db.close() | |
| app = FastAPI() | |
| rooms = ["room-" + str(i) for i in range(0, 41)] | |
| async def read_root(db: sqlite3.Connection = Depends(get_db)): | |
| out = db.execute("SELECT * FROM rooms").fetchall() | |
| print(out) | |
| return out | |
| async def create_room(db: sqlite3.Connection = Depends(get_db)): | |
| for room_id in rooms: | |
| print(room_id) | |
| createRoom(room_id, db) | |
| all = db.execute("SELECT * FROM rooms").fetchall() | |
| return all | |
| async def create_one_room(room_id: str): | |
| payload = {"id": room_id, "defaultAccesses": ["room:write"]} | |
| response = requests.post(f"https://api.liveblocks.io/v2/rooms", | |
| headers={"Authorization": f"Bearer {LIVEBLOCKS_SECRET}"}, json=payload) | |
| data = response.json() | |
| return data | |
| def createRoom(room_id, db): | |
| payload = {"id": room_id, "defaultAccesses": ["room:write"]} | |
| response = requests.post(f"https://api.liveblocks.io/v2/rooms", | |
| headers={"Authorization": f"Bearer {LIVEBLOCKS_SECRET}"}, json=payload) | |
| # if response.status_code == 200: | |
| data = response.json() | |
| print(data) | |
| if "error" in data and data["error"] == "ROOM_ALREADY_EXISTS": | |
| print("Room already exists") | |
| cursor = db.cursor() | |
| cursor.execute("INSERT INTO rooms (room_id) VALUES (?)", (room_id,)) | |
| db.commit() | |
| print("Room created") | |
| print("Created room", room_id) | |
| return True | |
| def generateAuthToken(): | |
| response = requests.get(f"https://liveblocks.io/api/authorize", | |
| headers={"Authorization": f"Bearer {LIVEBLOCKS_SECRET}"}) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data["token"] | |
| else: | |
| raise Exception(response.status_code, response.text) | |
| def get_room_count(room_id: str, jwtToken: str = ''): | |
| print("Getting room count" + room_id) | |
| response = requests.get( | |
| f"https://liveblocks.net/api/v1/room/{room_id}/users", headers={"Authorization": f"Bearer {jwtToken}", "Content-Type": "application/json"}) | |
| if response.status_code == 200: | |
| res = response.json() | |
| if "data" in res: | |
| return len(res["data"]) | |
| else: | |
| return 0 | |
| raise Exception("Error getting room count") | |
| async def sync_rooms(db: sqlite3.Connection = Depends(get_db)): | |
| try: | |
| jwtToken = generateAuthToken() | |
| rooms = db.execute("SELECT * FROM rooms").fetchall() | |
| for row in rooms: | |
| room_id = row["room_id"] | |
| users_count = get_room_count(room_id, jwtToken) | |
| print("Updating room", room_id, "with", users_count, "users") | |
| cursor = db.cursor() | |
| cursor.execute( | |
| "UPDATE rooms SET users_count = ? WHERE room_id = ?", (users_count, room_id)) | |
| db.commit() | |
| data = db.execute("SELECT * FROM rooms").fetchall() | |
| return data | |
| except Exception as e: | |
| print(e) | |
| return {"error": str(e)} | |
| if __name__ == "__main__": | |
| uvicorn.run("createRooms:app", host="0.0.0.0", | |
| log_level="debug", reload=True) | |