Spaces:
Sleeping
Sleeping
| from io import BytesIO | |
| from dotenv import load_dotenv | |
| import os | |
| from utils import * | |
| from fastapi import FastAPI, File, HTTPException, Header, UploadFile,status | |
| from tokenManagement import * | |
| from jwtcoding import * | |
| from fastapi.responses import JSONResponse | |
| import docx | |
| import fitz | |
| from scraper import scrapeCourse | |
| import asyncio | |
| from google import genai | |
| from fastapi.security import OAuth2PasswordBearer | |
| from typing import Optional | |
| from pydantic import BaseModel | |
| load_dotenv() | |
| CX = os.getenv("SEARCH_ENGINE_ID") | |
| API_KEY = os.getenv("GOOGLE_API_KEY") | |
| PINECONE_API_KEY=os.getenv("PINECONE_API_KEY") | |
| GEMINI_API_KEY=os.getenv("GEMINI_API_KEY") | |
| MONGO_URI=os.getenv("MONGO_URI") | |
| app = FastAPI() | |
| import re | |
| class UserBody(BaseModel): | |
| firstName: Optional[str] = None | |
| lastName: Optional[str] = None | |
| email:str | |
| password:str | |
| class AiAnalysis(BaseModel): | |
| query:str | |
| class Token(BaseModel): | |
| refreshToken:str | |
| class UserCourse(BaseModel): | |
| employmentStatus:str | |
| interimRole:str | |
| desiredRole:str | |
| motivation:str | |
| learningPreference:str | |
| hoursSpentLearning:str | |
| challenges:str | |
| timeframeToAchieveDreamRole:str | |
| class CourseRecommendation(BaseModel): | |
| courseName: str | |
| completionTime: str | |
| def extract_course_info(text: str) -> CourseRecommendation: | |
| # Example regex patterns – adjust these as needed based on the response format. | |
| course_pattern =r'"coursename":\s*"([^"]+)"' | |
| time_pattern = r"(\d+\s*-\s*\d+\s*months)" | |
| course_match = re.search(course_pattern, text) | |
| time_match = re.search(time_pattern, text) | |
| coursename = course_match.group(1).strip() if course_match else "Unknown" | |
| completiontime = time_match.group(0).strip() if time_match else "Unknown" | |
| return CourseRecommendation(courseName=coursename, completionTime=completiontime) | |
| def get_course(query): | |
| # Example search query | |
| results = google_search(query, API_KEY, CX) | |
| content=[] | |
| if results: | |
| for item in results.get('items', []): | |
| title = item.get('title') | |
| link = item.get('link') | |
| snippet = item.get('snippet') | |
| content_structure={} | |
| content_structure["courseTitle"]=title | |
| content_structure["courseLink"]=link | |
| content_structure["courseSnippet"]= snippet | |
| content_structure["scrapedCourseDetails"]= scrapeCourse(url=link) | |
| content.append(content_structure) | |
| return JSONResponse(content,status_code=200) | |
| def get_course_func(query): | |
| # Example search query | |
| results = google_search(query, API_KEY, CX) | |
| content=[] | |
| if results: | |
| for item in results.get('items', []): | |
| title = item.get('title') | |
| link = item.get('link') | |
| snippet = item.get('snippet') | |
| content_structure={} | |
| content_structure["courseTitle"]=title | |
| content_structure["courseLink"]=link | |
| content_structure["courseSnippet"]= snippet | |
| content_structure["scrapedCourseDetails"]= scrapeCourse(url=link) | |
| content.append(content_structure) | |
| return content | |
| async def upload_file(file: UploadFile = File(...),authorization: str = Header(...)): | |
| # Extract the token from the Authorization header (Bearer token) | |
| token = authorization.split("Bearer ")[-1] | |
| # Here, you would validate the token (e.g., check with a JWT library) | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| else: | |
| content = await file.read() # Read the file content (this will return bytes) | |
| sentences=[] | |
| print(f"File name: {file.filename}") | |
| print(f"File content type: {file.content_type}") | |
| print(f"File size: {file.size} bytes") | |
| if "pdf" == file.filename.split('.')[1]: | |
| pdf_document = fitz.open(stream=BytesIO(content), filetype="pdf") | |
| extracted_text = "" | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document.load_page(page_num) | |
| extracted_text += page.get_text() | |
| elif "docx" == file.filename.split('.')[1]: | |
| docx_file = BytesIO(content) | |
| doc = docx.Document(docx_file) | |
| extracted_text = "" | |
| for para in doc.paragraphs: | |
| extracted_text += para.text + "\n" | |
| sentences = split_text_into_chunks(extracted_text,chunk_size=200) | |
| docs = generate_embedding_for_user_resume(data=sentences,user_id=file.filename) | |
| response= insert_embeddings_into_pinecone_database(doc=docs,api_key=PINECONE_API_KEY,name_space=decoded_user_id) | |
| return {" name": file.filename,"response":str(response) } | |
| def ask_ai_about_resume(req:AiAnalysis,authorization: str = Header(...)): | |
| # Retrieve context from your vector database | |
| token = authorization.split("Bearer ")[-1] | |
| # Here, you would validate the token (e.g., check with a JWT library) | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| context = query_vector_database(query=req.Query, api_key=PINECONE_API_KEY, name_space=decoded_user_id) | |
| # Ensure that an event loop is present in this thread. | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Create the Gemini client after the event loop is set up | |
| client = genai.Client(api_key=GEMINI_API_KEY) | |
| response = client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=f""" | |
| Answer this question using the context provided: | |
| question: {req.Query} | |
| context: {context} | |
| """ | |
| ) | |
| return {"Ai_Response":response.text} | |
| def ask_ai_to_recommnd_courses(request:UserCourse,authorization:str=Header(...)): | |
| """ | |
| User Profile Information for Career Development | |
| This section defines the parameters used to gather information from the user to understand their current employment situation, learning preferences, challenges, and goals related to achieving their dream role. | |
| Parameters: | |
| employment_status (str): | |
| A description of the user's current employment situation (e.g., "unemployed", "part-time", "full-time"). | |
| interim_role (str): | |
| Indicates whether the user is willing to prepare for an interim role to gain experience and income while pursuing their dream role (e.g., "yes" or "no"). | |
| desired_role (str): | |
| The role the user ultimately wishes to obtain (e.g., "Full-Stack Developer", "Data Scientist"). | |
| motivation (str): | |
| The user's reasons or motivations for pursuing the desired role. | |
| learning_preference (str): | |
| Describes how the user prefers to learn new skills (e.g., "online courses", "self-study", "bootcamp"). | |
| hours_spent_learning (str or int): | |
| The number of hours per day the user can dedicate to learning. | |
| challenges (str): | |
| Outlines any obstacles or challenges the user faces in reaching their dream role. | |
| timeframe_to_achieve_dream_role (str): | |
| The ideal timeframe the user has in mind for achieving their dream role (e.g., "6-12 months"). | |
| """ | |
| # Extract the token from the Authorization header (Bearer token) | |
| token = authorization.split("Bearer ")[-1] | |
| # Here, you would validate the token (e.g., check with a JWT library) | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| # Ensure that an event loop is present in this thread. | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Create the Gemini client after the event loop is set up | |
| client = genai.Client(api_key=GEMINI_API_KEY) | |
| response = client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=f""" | |
| please respond with a JSON object that contains the following keys as a response: | |
| - "coursename": the name of the recommended course, | |
| - "completiontime": an estimate of how long it would take to complete the course. | |
| Do not include any extra text. | |
| Recommend a course using this information below : | |
| Which of the following best describes you?: {request.employmentStatus} | |
| Would you like to prepare for an interim role to gain experience and income while pursuing your dream job?: {request.InterimRole} | |
| What is your desired role?: {request.desiredRole} | |
| Why do you want to achieve this desired role?: {request.motivation} | |
| How do you prefer to learn new skills?: {request.learningPreference} | |
| How many hours per day can you dedicate to learning?: {request.hoursSpentLearning} | |
| What are the biggest challenges or obstacles you face in reaching your dream role?: {request.challenges} | |
| What is your ideal timeframe for achieving your dream role?: {request.timeframeToAchieveDreamRole} | |
| """ | |
| ) | |
| questions=request.model_dump() | |
| questions['userId']=decoded_user_id | |
| create_questionaire(db_uri=MONGO_URI,db_name="crayonics",collection_name="Questionaire",document=questions) | |
| course_info = extract_course_info(response.text) | |
| courses = get_course_func(query=course_info.courseName) | |
| return {"courseInfo":course_info,"courses":courses} | |
| def login(user:UserBody): | |
| user ={"email":user.email,"password":user.password,"firstName":user.firstName,"lastName":user.lastName} | |
| user_id= login_user(db_uri=MONGO_URI,db_name="crayonics",collection_name="Users",document=user) | |
| if user_id != False: | |
| refreshToken=create_refreshToken(db_uri=MONGO_URI,user_id=user_id) | |
| accessToken = create_accessToken(db_uri=MONGO_URI,user_id=user_id,refresh_token=refreshToken) | |
| result = update_refreshTokenWithPreviouslyUsedAccessToken(db_uri=MONGO_URI,refresh_token=refreshToken,access_token=accessToken) | |
| print(result) | |
| access_token = encode_jwt(user_id=user_id,access_token=accessToken) | |
| return {"userId":user_id,"refreshToken":refreshToken,"accessToken":access_token} | |
| return JSONResponse(status_code=401,content="Invalid login details") | |
| def signUp(user:UserBody): | |
| user ={"email":user.email,"password":user.password} | |
| user_id= create_user(db_uri=MONGO_URI,db_name="crayonics",collection_name="users",document=user) | |
| if user_id != False: | |
| refreshToken=create_refreshToken(db_uri=MONGO_URI,user_id=user_id) | |
| accessToken = create_accessToken(db_uri=MONGO_URI,user_id=user_id,refresh_token=refreshToken) | |
| result = update_refreshTokenWithPreviouslyUsedAccessToken(db_uri=MONGO_URI,refresh_token=refreshToken,access_token=accessToken) | |
| print(result) | |
| access_token = encode_jwt(user_id=user_id,access_token=accessToken) | |
| return {"userId":user_id,"refreshToken":refreshToken,"accessToken":access_token} | |
| return JSONResponse(status_code=401,content="Invalid Sign Up details") | |
| def signUp(refresh:Token,authorization: str = Header(...)): | |
| token = authorization.split("Bearer ")[-1] | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
| result = logout(db_uri=MONGO_URI,refresh_token= refresh.refreshToken) | |
| if result ==True: | |
| return {"content": f"successful"} | |
| else: | |
| return JSONResponse(status_code=status.HTTP_410_GONE,content={"content": f"unsuccessful"}) | |
| def refresh_access_token(refresh_token:Token, authorization: str = Header(...)): | |
| token = authorization.split("Bearer ")[-1] | |
| # Here, you would validate the token (e.g., check with a JWT library) | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| print(decoded_user_id,decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| new_access_token = create_accessToken(db_uri=MONGO_URI,user_id=decoded_user_id,refresh_token=refresh_token.refreshToken) | |
| newly_encoded_access_token = encode_jwt(user_id=decoded_user_id,access_token=new_access_token) | |
| return {"accessToken":newly_encoded_access_token} | |
| def protected_route(authorization: str = Header(...)): | |
| # Extract the token from the Authorization header (Bearer token) | |
| token = authorization.split("Bearer ")[-1] | |
| # Here, you would validate the token (e.g., check with a JWT library) | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| return {"message": "Access granted", "verification": "verified"} | |