Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException, Request, Form, Depends | |
from fastapi.responses import JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from bson import ObjectId | |
import pymongo | |
app = FastAPI() | |
# Enable CORS | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# MongoDB connection | |
mongo_url = "mongodb+srv://ip6ofme:[email protected]/" | |
client = pymongo.MongoClient(mongo_url) | |
db = client["test"] | |
pdf_collection = db["PdfDetails"] | |
voter_collection = db["Voters"] | |
# Models | |
class VoteRequest(BaseModel): | |
id: str | |
class VoterVoteRequest(BaseModel): | |
voter_id: str | |
file_id: str | |
class RegisterVoterRequest(BaseModel): | |
name: str | |
group: str | |
role: str | |
# API to upload file information to MongoDB | |
async def upload_file(title: str = Form(...), group: str = Form(...), url: str = Form(...)): | |
new_pdf = { | |
"title": title, | |
"group": group, | |
"url": url, | |
"votes": 0, | |
} | |
result = pdf_collection.insert_one(new_pdf) | |
return {"status": "ok", "id": str(result.inserted_id)} | |
# API to increment file vote count by ID | |
async def vote(request: VoteRequest): | |
file_id = request.id | |
try: | |
file = pdf_collection.find_one({"_id": ObjectId(file_id)}) | |
if not file: | |
raise HTTPException(status_code=404, detail="File not found") | |
pdf_collection.update_one({"_id": ObjectId(file_id)}, {"$inc": {"votes": 1}}) | |
return {"status": "ok"} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# API to get vote count by file ID | |
async def get_votes(id: str): | |
try: | |
file = pdf_collection.find_one({"_id": ObjectId(id)}) | |
if not file: | |
raise HTTPException(status_code=404, detail="File not found") | |
return {"status": "ok", "votes": file.get("votes", 0)} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# API to retrieve list of all files | |
async def get_files(): | |
try: | |
files = pdf_collection.find({}) | |
file_list = [ | |
{ | |
"id": str(file["_id"]), | |
"title": file["title"], | |
"group": file["group"], | |
"url": file["url"], | |
"votes": file.get("votes", 0), | |
} | |
for file in files | |
] | |
return {"status": "ok", "data": file_list} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# API to register a voter | |
async def register_voter(request: RegisterVoterRequest): | |
new_voter = { | |
"name": request.name, | |
"group": request.group, | |
"role": request.role, | |
"number_of_votes": 0, | |
} | |
result = voter_collection.insert_one(new_voter) | |
return {"status": "ok", "id": str(result.inserted_id)} | |
# API for voting by voter | |
async def vote_by_voter(request: VoterVoteRequest): | |
voter = voter_collection.find_one({"_id": ObjectId(request.voter_id)}) | |
if not voter: | |
raise HTTPException(status_code=404, detail="Voter not found") | |
max_votes = 5 if voter["role"] == "judge" else 2 | |
if voter["number_of_votes"] >= max_votes: | |
raise HTTPException(status_code=400, detail="Maximum votes reached") | |
voter_collection.update_one({"_id": ObjectId(request.voter_id)}, {"$inc": {"number_of_votes": 1}}) | |
pdf_collection.update_one({"_id": ObjectId(request.file_id)}, {"$inc": {"votes": 1}}) | |
return {"status": "ok", "message": "Vote recorded successfully"} | |
# API to get voter information | |
async def get_voter(id: str): | |
voter = voter_collection.find_one({"_id": ObjectId(id)}) | |
if not voter: | |
raise HTTPException(status_code=404, detail="Voter not found") | |
return { | |
"status": "ok", | |
"name": voter["name"], | |
"group": voter["group"], | |
"role": voter["role"], | |
"number_of_votes": voter["number_of_votes"], | |
} | |