Spaces:
Sleeping
Sleeping
File size: 4,210 Bytes
ecf06e7 19a6cbe ecf06e7 19a6cbe f8a47c9 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 f8a47c9 ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 f8a47c9 ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 f8a47c9 ecf06e7 f8a47c9 ecf06e7 19a6cbe ecf06e7 19a6cbe ecf06e7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
from fastapi import FastAPI, HTTPException, Request, Form, Depends
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from bson import ObjectId
import pymongo
app = FastAPI()
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# MongoDB connection
mongo_url = "mongodb+srv://ip6ofme:[email protected]/"
client = pymongo.MongoClient(mongo_url)
db = client["test"]
pdf_collection = db["PdfDetails"]
voter_collection = db["Voters"]
# Models
class VoteRequest(BaseModel):
id: str
class VoterVoteRequest(BaseModel):
voter_id: str
file_id: str
class RegisterVoterRequest(BaseModel):
name: str
group: str
role: str
# API to upload file information to MongoDB
@app.post("/upload-files")
async def upload_file(title: str = Form(...), group: str = Form(...), url: str = Form(...)):
new_pdf = {
"title": title,
"group": group,
"url": url,
"votes": 0,
}
result = pdf_collection.insert_one(new_pdf)
return {"status": "ok", "id": str(result.inserted_id)}
# API to increment file vote count by ID
@app.post("/vote")
async def vote(request: VoteRequest):
file_id = request.id
try:
file = pdf_collection.find_one({"_id": ObjectId(file_id)})
if not file:
raise HTTPException(status_code=404, detail="File not found")
pdf_collection.update_one({"_id": ObjectId(file_id)}, {"$inc": {"votes": 1}})
return {"status": "ok"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# API to get vote count by file ID
@app.get("/get-votes")
async def get_votes(id: str):
try:
file = pdf_collection.find_one({"_id": ObjectId(id)})
if not file:
raise HTTPException(status_code=404, detail="File not found")
return {"status": "ok", "votes": file.get("votes", 0)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# API to retrieve list of all files
@app.get("/get-files")
async def get_files():
try:
files = pdf_collection.find({})
file_list = [
{
"id": str(file["_id"]),
"title": file["title"],
"group": file["group"],
"url": file["url"],
"votes": file.get("votes", 0),
}
for file in files
]
return {"status": "ok", "data": file_list}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# API to register a voter
@app.post("/register-voter")
async def register_voter(request: RegisterVoterRequest):
new_voter = {
"name": request.name,
"group": request.group,
"role": request.role,
"number_of_votes": 0,
}
result = voter_collection.insert_one(new_voter)
return {"status": "ok", "id": str(result.inserted_id)}
# API for voting by voter
@app.post("/vote-by-voter")
async def vote_by_voter(request: VoterVoteRequest):
voter = voter_collection.find_one({"_id": ObjectId(request.voter_id)})
if not voter:
raise HTTPException(status_code=404, detail="Voter not found")
max_votes = 5 if voter["role"] == "judge" else 2
if voter["number_of_votes"] >= max_votes:
raise HTTPException(status_code=400, detail="Maximum votes reached")
voter_collection.update_one({"_id": ObjectId(request.voter_id)}, {"$inc": {"number_of_votes": 1}})
pdf_collection.update_one({"_id": ObjectId(request.file_id)}, {"$inc": {"votes": 1}})
return {"status": "ok", "message": "Vote recorded successfully"}
# API to get voter information
@app.get("/get-voter")
async def get_voter(id: str):
voter = voter_collection.find_one({"_id": ObjectId(id)})
if not voter:
raise HTTPException(status_code=404, detail="Voter not found")
return {
"status": "ok",
"name": voter["name"],
"group": voter["group"],
"role": voter["role"],
"number_of_votes": voter["number_of_votes"],
}
|