adityaiiitr's picture
Update main.py
b95250c verified
raw
history blame
13.4 kB
import os
import google.generativeai as genai
from playwright.async_api import async_playwright
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from typing import Optional
import uvicorn
import asyncio
import json
import requests
from bs4 import BeautifulSoup
import logging
# Load environment variables
load_dotenv()
# Configure Google Generative AI API key
genai.configure(api_key=os.environ["API_KEY"])
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("ScrapeStructureApp")
# FastAPI app initialization
app = FastAPI()
# Function to scrape webpage and extract visible text
async def scrape_visible_text(url):
try:
logger.info(f"Starting to scrape visible text from URL: {url}")
async with async_playwright() as p:
# Launch the browser in headless mode (can change to headless=False if you want to see it)
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 800},
extra_http_headers={
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"pragma": "no-cache",
"sec-ch-ua": '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"cookie":"edgebucket=hBkVtnBilf5ZcTuaEk; loid=000000000kxnyp2ukv.2.1696254856611.Z0FBQUFBQmxHc3VJaXdfb25NRHlscm9yX3lPUVJLVmZjdEUtbzB2V2lXaUtla1kzdF9ZZnR4QnFVMktmbmZGaVp6VFh1QW5oLWN6eE0xaXBGVDdybmlnY0o5YTNFZWVubGJZdUdhekZNaHpXTjgtdmJDVHc4MmJJelNWdHBJc0Fmb19DY2hMTkIwM1U; csv=2; token_v2=eyJhbGciOiJSUzI1NiIsImtpZCI6IlNIQTI1NjpzS3dsMnlsV0VtMjVmcXhwTU40cWY4MXE2OWFFdWFyMnpLMUdhVGxjdWNZIiwidHlwIjoiSldUIn0.eyJzdWIiOiJsb2lkIiwiZXhwIjoxNzI5NzU3MzAyLjI0NzU5MywiaWF0IjoxNzI5NjcwOTAyLjI0NzU5MiwianRpIjoiNDNkcEVzOHc5NHFPeDRWMm5GSHFXTkZRVUNQUTdRIiwiY2lkIjoiMFItV0FNaHVvby1NeVEiLCJsaWQiOiJ0Ml9reG55cDJ1a3YiLCJsY2EiOjE2OTYyNTQ4NTY2MTEsInNjcCI6ImVKeGtrZEdPdERBSWhkLWwxejdCX3lwX05odHNjWWFzTFFhb2szbjdEVm9jazcwN2NMNGlIUDhuS0lxRkxFMnVCS0drS1dFRld0T1VOaUx2NTh5OU9aRUZTeUZUUjg0M3l3b2thVXBQVW1ONXB5bFJ3V1prTGxmYXNVS0RCNllwVlM2WjIwS1BTNXZRM0kxRnowNk1xbHhXSHRUWW8zSnBiR01LMnhQanpjWnFReXF1eTZsTVlGa29uOFdMZnZ5Ry10WS1mN2JmaEhZd3JLZ0tEX1RPdUZ4d1lfSERGSGJfbnByMGJGMndxTDNYZzlRLTEtTjI3Yk5tb2RtNV9WelB2emFTY1RtRzVpZll2N3QtQ1IxNDVIbVpVUWN3WWcwX3lyQWo2X0N2T29ES0JRV01KWWhQSTVBcmwyX19KZGl1VGY4YXR5ZC0tR2JFVFdfNHJSbW81eExFb1VfajZ6Y0FBUF9fWERfZTR3IiwiZmxvIjoxfQ.N9pJU3-iYMMjO2RfGjaqt5eBrAS3J2QCnrbpj0U44skDBn-m3kWGooW71qRywcRqy5OAFS37hAbkP061l0HMkO9GaYkxwitQ-uvgaU_Kg77Ypu_tnBkXeiMvjwQ7RjtpscOTWjWSLVj83jKhyPQCGQdIxHEs9W9rGCI6BM9_SnqnZ4Ag8THN2BZWpZUkLIzxCZjOtJJTfnTaWK3o2t_vHuhv8EA-AWKqtaqXqs5EEJKT5yRYFVheRuMQc51Cx4D6-svZRU_OMurIawrHNWSf57MIMGwRhkhpZvUL_pnYVhi5GHS8khzIfnRoZMu9X5KDcujFXVZOaBxnBv137UZfsA; session_tracker=hlefhcoceboebaqchd.0.1729670914781.Z0FBQUFBQm5HSzhDQ3J1a2wwd29iU1oxNjlJdFFIRVQxQ0FFOEEwa29HcDVBeXhYR1g1anE5UGhwaGFaQTByZE45LXlJcjRBSmE1UzcwTW9INFNrYU9ocC1VamJnaFFUZFdNQl9MQXZKaWxZYUhmR3JwajQ4YnhHY3hoWGhoYWxnZ1R4cTRfaW9JWnk"
"upgrade-insecure-requests": "1",
}
)
page = await context.new_page()
await page.goto(url, wait_until="domcontentloaded") # Wait until the DOM is fully loaded
# Extract visible text from the body of the page
visible_text = await page.evaluate("document.body.innerText")
await browser.close()
logger.info(f"Successfully scraped visible text from URL: {url}")
return visible_text
except Exception as e:
logger.error(f"Error while scraping visible text from URL {url}: {e}")
raise
# Function to structure data using Google's Gemini model
def structure_data(text, college_name):
try:
logger.info(f"Starting to structure data for college: {college_name}")
prompt = f"Convert the following unstructured text into a well-written and comprehensive structured form with titles and content containing all relevant data. The response should be a detailed paragraph mentioning everything about the college named '{college_name}', ensuring no important information is missed. Include details such as connectivity, placement, nearby colleges, infrastructure, courses, branches, students, festivals, clubs, reviews, Q&A, and any other college-related parameters available in the text. Provide the response text with no formatting! --- \n{text} ---. Use only the text between the '---' markers as input source text. If information is not available about any specific thing dont mention it."
model = genai.GenerativeModel("gemini-1.5-pro")
response = model.generate_content(prompt)
logger.info(f"Successfully structured data for college: {college_name}")
return response.text.strip()
except Exception as e:
logger.error(f"Error while structuring data for college {college_name}: {e}")
raise
# Pydantic model for request body
class URLRequest(BaseModel):
url: str
college_name: str
# Pydantic model for Crawler request
class CrawlerRequest(BaseModel):
topic_title: str
# Function to perform Google search and return top N links
def google_search(query, num_results=5):
try:
logger.info(f"Performing Google search for query: {query}")
search_url = f"https://www.google.com/search?q={query}&num={num_results}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"
}
response = requests.get(search_url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
links = []
for a in soup.find_all('a', href=True, attrs={'jsname': True}):
link = a['href']
if link.startswith("https://") and not link.__contains__("google.com"):
links.append(link)
logger.info(f"Successfully retrieved {len(links)} links for query: {query}")
return links[:num_results]
except Exception as e:
logger.error(f"Error while performing Google search for query {query}: {e}")
raise
# Function to perform advanced search on specific sites
def advanced_search_on_site(site, topic, num_results=10):
query = f"site:{site} {topic}"
return google_search(query, num_results)
# FastAPI endpoint to scrape and structure data
@app.post("/scrape")
async def scrape_and_structure_data(request: URLRequest):
try:
logger.info(f"Received scrape request for URL: {request.url}, College Name: {request.college_name}")
# Scrape visible text from the webpage
visible_text = await scrape_visible_text(request.url)
# Structure the data using Google's Gemini model
structured_data = structure_data(visible_text, request.college_name)
logger.info(f"Successfully processed scrape request for URL: {request.url}")
# Return the structured data
return {"structured_data": structured_data}
except Exception as e:
logger.error(f"Error occurred while processing scrape request for URL {request.url}: {e}")
raise HTTPException(status_code=500, detail=str(e))
# FastAPI endpoint to perform web crawling
@app.post("/crawl")
async def crawl_web(request: CrawlerRequest):
try:
topic_title = request.topic_title
logger.info(f"Received crawl request for topic: {topic_title}")
# Get top 5 links from Google search
google_links = google_search(topic_title, num_results=10)
# Get links from Quora
quora_links = advanced_search_on_site("quora.com", topic_title, num_results=10)
# Additional sites can be added similarly
other_links = advanced_search_on_site("reddit.com", topic_title, num_results=10)
# Combine all links
all_links = google_links + quora_links + other_links
# Use Gemini to filter and list relevant URLs
prompt = f"Filter the following URLs and list only those that are most relevant to the topic '{topic_title}':\n{all_links}. Response should only contain the array of links with no formatting."
model = genai.GenerativeModel("gemini-1.5-pro")
response = model.generate_content(prompt)
filtered_links = response.text.strip().split('\n')
logger.info(f"Successfully processed crawl request for topic: {topic_title}")
# Return the filtered links
return {"links": all_links, "filtered_links": filtered_links}
except Exception as e:
logger.error(f"Error occurred while processing crawl request for topic {topic_title}: {e}")
raise HTTPException(status_code=500, detail=str(e))
class SiteSearch(BaseModel):
site_url: str # Website to perform advanced search on
num_results: Optional[int] = 5 # Optional number of results to fetch, default is 5
class ScrapeAndCrawlRequest(BaseModel):
topic_title: str # The topic (and college name) for crawling and structuring
model_name: str = "gemini-1.5-pro" # Default to 'gemini-1.5-pro'
sites: list[SiteSearch] # List of websites and the number of results for each site
group_size: Optional[int] = 3 # Number of links to group together for each GenAI call
@app.post("/scrape-and-crawl")
async def scrape_and_crawl(
request: ScrapeAndCrawlRequest,
x_api_key: Optional[str] = Header(None) # API key to be passed in the request header
):
try:
if not x_api_key:
raise HTTPException(status_code=400, detail="API key is missing from the header")
logger.info(f"Received combined scrape and crawl request for Topic: {request.topic_title}")
# Configure Google Generative AI API key from header
genai.configure(api_key=x_api_key)
# Initialize lists to hold all crawled links and structured data
all_links = []
all_scraped_texts = []
structured_data_list = []
# Perform advanced search on the provided sites with custom result counts
for site in request.sites:
logger.info(f"Performing advanced search on {site.site_url} for {site.num_results} results")
site_links = advanced_search_on_site(site.site_url, request.topic_title, num_results=site.num_results)
all_links.extend(site_links)
# Scrape visible text from each fetched link and gather all the texts
for link in all_links:
logger.info(f"Scraping visible text from link: {link}")
try:
visible_text = await scrape_visible_text(link) # Scrape the text
all_scraped_texts.append(visible_text)
except Exception as scrape_error:
logger.error(f"Error scraping link {link}: {scrape_error}")
continue # If scraping fails, continue with the next link
# Process the scraped text in groups to minimize GenAI API calls
group_size = request.group_size or 3 # Use default group size if not provided
for i in range(0, len(all_scraped_texts), group_size):
text_group = all_scraped_texts[i:i + group_size] # Get the text for the current group
combined_text = "\n".join(text_group) # Combine all the texts in this group
logger.info(f"Structuring data for group {i // group_size + 1} with {len(text_group)} links.")
prompt = f"Convert the following unstructured text into a well-written and comprehensive structured form with titles and content. --- {combined_text} ---"
# Generate structured content using Google Generative AI
try:
model = genai.GenerativeModel(request.model_name)
response = model.generate_content(prompt)
structured_data_list.append(response.text.strip())
except Exception as e:
logger.error(f"Error generating structured data for group {i // group_size + 1}: {e}")
continue
# Return the structured data for all successfully processed groups
logger.info(f"Successfully processed combined request for Topic: {request.topic_title}")
return {
"structured_data": structured_data_list
}
except Exception as e:
logger.error(f"Error occurred while processing combined request: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
logger.info("Starting PreCollege Data Scraper Server...")
uvicorn.run(app, host="0.0.0.0", port=7860)