# main.py from fastapi import FastAPI, UploadFile, File, HTTPException,Depends,Header from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import google.generativeai as genai from typing import List, Dict import os from dotenv import load_dotenv import io from datetime import datetime import uuid import json import re # File Format Libraries import PyPDF2 import docx import openpyxl import csv import io import pptx from db import get_db,Chat,ChatMessage,User,SessionLocal from fastapi.security import OAuth2PasswordBearer import requests from jose import jwt oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") DOMAIN = "http://localhost:8000" # Replace these with your own values from the Google Developer Console GOOGLE_CLIENT_ID = "862058885628-e6mjev28p8e112qrp9gnn4q8mlif3bbf.apps.googleusercontent.com" GOOGLE_CLIENT_SECRET = "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" GOOGLE_REDIRECT_URI = "http://localhost:5173/callback" def parse_json_from_gemini(json_str: str): try: # Remove potential leading/trailing whitespace json_str = json_str.strip() # Extract JSON content from triple backticks and "json" language specifier json_match = re.search(r"```json\s*(.*?)\s*```", json_str, re.DOTALL) if json_match: json_str = json_match.group(1) return json.loads(json_str) except (json.JSONDecodeError, AttributeError): return None load_dotenv() app = FastAPI(title="EduScope AI") # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/login/google") async def login_google(): return { "url": f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email&access_type=offline" } @app.get("/auth/google") async def auth_google(code: str, db: SessionLocal = Depends(get_db)): token_url = "https://accounts.google.com/o/oauth2/token" data = { "code": code, "client_id": GOOGLE_CLIENT_ID, "client_secret": GOOGLE_CLIENT_SECRET, "redirect_uri": GOOGLE_REDIRECT_URI, "grant_type": "authorization_code", } response = requests.post(token_url, data=data) access_token = response.json().get("access_token") user_info = requests.get("https://www.googleapis.com/oauth2/v1/userinfo", headers={"Authorization": f"Bearer {access_token}"}).json() user = db.query(User).filter(User.id == user_info["id"]).first() if not user: user = User(id=user_info["id"], email=user_info["email"], name=user_info["name"]) db.add(user) db.commit() return {"token": jwt.encode(user_info, GOOGLE_CLIENT_SECRET, algorithm="HS256")} # return user_info.json() async def decode_token(authorization: str = Header(...)): if not authorization.startswith("Bearer "): raise HTTPException( status_code=400, detail="Authorization header must start with 'Bearer '" ) token = authorization[len("Bearer "):] # Extract token part try: # Decode and verify the JWT token token_data = jwt.decode(token, GOOGLE_CLIENT_SECRET, algorithms=["HS256"]) return token_data # Return decoded token data except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token has expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") @app.get("/token") async def get_token(user_data: dict = Depends(decode_token)): return user_data @app.post("/chats") async def create_chat(title: str,user_data: dict = Depends(decode_token), db: SessionLocal = Depends(get_db)): user_id = user_data["id"] chat = Chat(chat_id=str(uuid.uuid4()), user_id=user_id, title=title) db.add(chat) db.commit() return {"chat_id": chat.chat_id, "title": title, "timestamp": chat.timestamp} @app.get("/chats") async def get_chats(user_data: dict = Depends(decode_token), db: SessionLocal = Depends(get_db)): user_id = user_data["id"] chats = db.query(Chat).filter(Chat.user_id == user_id).all() return [{"chat_id": chat.chat_id, "title": chat.title, "timestamp": chat.timestamp} for chat in chats] genai.configure(api_key="AIzaSyDZsN3hnnNQOBLSAznFh7xWbWKNohvqff0") model = genai.GenerativeModel('gemini-1.5-flash') documents = {} chat_history = [] class Document(BaseModel): id: str name: str content: str timestamp: str class Query(BaseModel): text: str selected_docs: List[str] class ChatMessage(BaseModel): id: str type: str # 'user' or 'assistant' content: str timestamp: str referenced_docs: List[str] = [] class Analysis(BaseModel): insight: str pareto_analysis: dict def extract_text_from_file(file: UploadFile): """ Extract text from various file types Supports: PDF, DOCX, XLSX, CSV, TXT """ file_extension = os.path.splitext(file.filename)[1].lower() content = file.file.read() try: if file_extension == '.pdf': pdf_reader = PyPDF2.PdfReader(io.BytesIO(content)) text = "\n".join([page.extract_text() for page in pdf_reader.pages]) elif file_extension == '.docx': doc = docx.Document(io.BytesIO(content)) text = "\n".join([para.text for para in doc.paragraphs]) elif file_extension == '.xlsx': wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True) text = "" for sheet in wb: for row in sheet.iter_rows(values_only=True): text += " ".join(str(cell) for cell in row if cell is not None) + "\n" elif file_extension == '.csv': csv_reader = csv.reader(io.StringIO(content.decode('utf-8'))) text = "\n".join([" ".join(row) for row in csv_reader]) elif file_extension == '.txt': text = content.decode('utf-8') elif file_extension in ['.ppt', '.pptx']: ppt = pptx.Presentation(io.BytesIO(content)) text = "" for slide in ppt.slides: for shape in slide.shapes: if hasattr(shape, "text"): text += shape.text + "\n" else: raise ValueError(f"Unsupported file type: {file_extension}") return text except Exception as e: raise HTTPException(status_code=400, detail=f"Error processing file: {str(e)}") @app.post("/upload") async def upload_document(file: UploadFile = File(...)): try: text = extract_text_from_file(file) doc_id = str(uuid.uuid4()) document = Document( id=doc_id, name=file.filename, content=text, timestamp=datetime.now().isoformat() ) documents[doc_id] = document return document.dict() except HTTPException as e: raise e except Exception as e: raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") @app.get("/documents") async def get_documents(): return list(documents.values()) @app.post("/analyze", response_model=Analysis) async def analyze_text(query: Query): # try: # Combine content from selected documents combined_context = "\n\n".join([ f"Document '{documents[doc_id].name}':\n{documents[doc_id].content}" for doc_id in query.selected_docs ]) prompt = f""" Analyze the following text in the context of this query: {query.text} Context from multiple documents: {combined_context} Provide: 1. Detailed insights and analysis, comparing information across documents when relevant 2. Apply the Pareto Principle (80/20 rule) to identify the most important aspects Format the response as JSON with 'insight' and 'pareto_analysis' keys. Example format: {{ "insight": "Key findings and analysis from the documents...", "pareto_analysis": {{ "vital_few": "The 20% of factors that drive 80% of the impact...", "trivial_many": "The remaining 80% of factors that contribute 20% of the impact..." }} }} also give a complete html document with the illustrative analysis like pie charts, bar charts,graphs etc. """ response = model.generate_content(prompt) response_text = response.text # print(response_text) # Create chat message message = ChatMessage( id=str(uuid.uuid4()), type="user", content=query.text, timestamp=datetime.now().isoformat(), referenced_docs=query.selected_docs ) chat_history.append(message) # print(response_text) # Create assistant response # analysis = { # "insight": response_text.split("Pareto Analysis:")[0].strip(), # "pareto_analysis": { # "vital_few": response_text.split("Vital Few (20%):")[1].split("Trivial Many")[0].strip(), # "trivial_many": response_text.split("Trivial Many (80%):")[1].strip() # } # } analysis = parse_json_from_gemini(response_text) assistant_message = ChatMessage( id=str(uuid.uuid4()), type="assistant", content=json.dumps(analysis, indent=4), timestamp=datetime.now().isoformat(), referenced_docs=query.selected_docs ) chat_history.append(assistant_message) if '```html' in response_text: html = response_text.split('```html')[1] html = html.split('```')[0] html = html.strip() assistant_message = ChatMessage( id=str(uuid.uuid4()), type="assistant", content=html, timestamp=datetime.now().isoformat(), referenced_docs=query.selected_docs ) chat_history.append(assistant_message) return analysis # except Exception as e: # raise HTTPException(status_code=500, detail=str(e)) @app.get("/chat-history") async def get_chat_history(): return chat_history @app.get("/clear-all") async def clear_all(): chat_history.clear() documents.clear() return {"message": "All Data cleared successfully"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)