Spaces:
Sleeping
Sleeping
File size: 10,467 Bytes
60b9061 9c2f54a 60b9061 9c2f54a 60b9061 a88b526 cf196e2 60b9061 cf196e2 60b9061 cf196e2 60b9061 cf196e2 60b9061 cf196e2 a88b526 cf196e2 a88b526 60b9061 cf196e2 60b9061 cf196e2 60b9061 cf196e2 60b9061 cf196e2 a88b526 cf196e2 a88b526 e827602 a88b526 9ad60e7 a88b526 cf196e2 a88b526 cf196e2 a88b526 cf196e2 a88b526 9c2f54a cf196e2 60b9061 cf196e2 60b9061 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
import os
import google.generativeai as genai
from playwright.async_api import async_playwright
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from typing import Optional
import uvicorn
import asyncio
import json
import requests
from bs4 import BeautifulSoup
import logging
# Load environment variables
load_dotenv()
# Configure Google Generative AI API key
genai.configure(api_key=os.environ["API_KEY"])
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("ScrapeStructureApp")
# FastAPI app initialization
app = FastAPI()
# Function to scrape webpage and extract visible text
async def scrape_visible_text(url):
try:
logger.info(f"Starting to scrape visible text from URL: {url}")
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True) # Launch browser in headless mode
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 800},
extra_http_headers={
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "en-US,en;q=0.9,hi;q=0.8",
"cache-control": "max-age=0",
"sec-ch-ua": '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1"
}
)
page = await context.new_page()
await page.goto(url, wait_until="domcontentloaded")
visible_text = await page.evaluate("document.body.innerText")
await browser.close()
logger.info(f"Successfully scraped visible text from URL: {url}")
return visible_text
except Exception as e:
logger.error(f"Error while scraping visible text from URL {url}: {e}")
raise
# Function to structure data using Google's Gemini model
def structure_data(text, college_name):
try:
logger.info(f"Starting to structure data for college: {college_name}")
prompt = f"Convert the following unstructured text into a well-written and comprehensive structured form with titles and content containing all relevant data. The response should be a detailed paragraph mentioning everything about the college named '{college_name}', ensuring no important information is missed. Include details such as connectivity, placement, nearby colleges, infrastructure, courses, branches, students, festivals, clubs, reviews, Q&A, and any other college-related parameters available in the text. Provide the response text with no formatting! --- \n{text} ---. Use only the text between the '---' markers as input source text. If information is not available about any specific thing dont mention it."
model = genai.GenerativeModel("gemini-1.5-pro")
response = model.generate_content(prompt)
logger.info(f"Successfully structured data for college: {college_name}")
return response.text.strip()
except Exception as e:
logger.error(f"Error while structuring data for college {college_name}: {e}")
raise
# Pydantic model for request body
class URLRequest(BaseModel):
url: str
college_name: str
# Pydantic model for Crawler request
class CrawlerRequest(BaseModel):
topic_title: str
# Function to perform Google search and return top N links
def google_search(query, num_results=5):
try:
logger.info(f"Performing Google search for query: {query}")
search_url = f"https://www.google.com/search?q={query}&num={num_results}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"
}
response = requests.get(search_url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
links = []
for a in soup.find_all('a', href=True, attrs={'jsname': True}):
link = a['href']
if link.startswith("https://") and not link.__contains__("google.com"):
links.append(link)
logger.info(f"Successfully retrieved {len(links)} links for query: {query}")
return links[:num_results]
except Exception as e:
logger.error(f"Error while performing Google search for query {query}: {e}")
raise
# Function to perform advanced search on specific sites
def advanced_search_on_site(site, topic, num_results=10):
query = f"site:{site} {topic}"
return google_search(query, num_results)
# FastAPI endpoint to scrape and structure data
@app.post("/scrape")
async def scrape_and_structure_data(request: URLRequest):
try:
logger.info(f"Received scrape request for URL: {request.url}, College Name: {request.college_name}")
# Scrape visible text from the webpage
visible_text = await scrape_visible_text(request.url)
# Structure the data using Google's Gemini model
structured_data = structure_data(visible_text, request.college_name)
logger.info(f"Successfully processed scrape request for URL: {request.url}")
# Return the structured data
return {"structured_data": structured_data}
except Exception as e:
logger.error(f"Error occurred while processing scrape request for URL {request.url}: {e}")
raise HTTPException(status_code=500, detail=str(e))
# FastAPI endpoint to perform web crawling
@app.post("/crawl")
async def crawl_web(request: CrawlerRequest):
try:
topic_title = request.topic_title
logger.info(f"Received crawl request for topic: {topic_title}")
# Get top 5 links from Google search
google_links = google_search(topic_title, num_results=10)
# Get links from Quora
quora_links = advanced_search_on_site("quora.com", topic_title, num_results=10)
# Additional sites can be added similarly
other_links = advanced_search_on_site("reddit.com", topic_title, num_results=10)
# Combine all links
all_links = google_links + quora_links + other_links
# Use Gemini to filter and list relevant URLs
prompt = f"Filter the following URLs and list only those that are most relevant to the topic '{topic_title}':\n{all_links}. Response should only contain the array of links with no formatting."
model = genai.GenerativeModel("gemini-1.5-pro")
response = model.generate_content(prompt)
filtered_links = response.text.strip().split('\n')
logger.info(f"Successfully processed crawl request for topic: {topic_title}")
# Return the filtered links
return {"links": all_links, "filtered_links": filtered_links}
except Exception as e:
logger.error(f"Error occurred while processing crawl request for topic {topic_title}: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Updated Pydantic models
class ScrapeAndCrawlRequest(BaseModel):
url: str
college_name: str
topic_title: str
model_name: str = "gemini-1.5-pro" # Default to 'gemini-1.5-pro'
num_results: int = 5 # Default number of results to fetch from Google, Quora, Reddit
# Combined API endpoint
@app.post("/scrape-and-crawl")
async def scrape_and_crawl(
request: ScrapeAndCrawlRequest,
x_api_key: Optional[str] = Header(None) # API key to be passed in the request header
):
try:
if not x_api_key:
raise HTTPException(status_code=400, detail="API key is missing from the header")
logger.info(f"Received combined scrape and crawl request for URL: {request.url}, College Name: {request.college_name}, Topic: {request.topic_title}")
# Configure Google Generative AI API key from header
genai.configure(api_key=x_api_key)
# Scrape visible text from the provided URL asynchronously
visible_text = await scrape_visible_text(request.url)
# Structure the scraped data using the specified model from the request
structured_data = structure_data(visible_text, request.college_name)
# Perform web crawling to get related links with customizable result count
google_links = google_search(request.topic_title, num_results=request.num_results)
quora_links = advanced_search_on_site("quora.com", request.topic_title, num_results=request.num_results)
reddit_links = advanced_search_on_site("reddit.com", request.topic_title, num_results=request.num_results)
# Combine all links into one list
all_links = google_links + quora_links + reddit_links
# Use the specified model to filter and get the most relevant URLs
prompt = f"Filter the following URLs and list only those that are most relevant to the topic '{request.topic_title}':\n{all_links}. Response should only contain the array of links with no formatting."
model = genai.GenerativeModel(request.model_name)
response = model.generate_content(prompt)
filtered_links = response.text.strip().split('\n')
# Return the combined structured data and filtered links
logger.info(f"Successfully processed combined request for URL: {request.url} and Topic: {request.topic_title}")
return {
"structured_data": structured_data,
"all_links": all_links,
"filtered_links": filtered_links
}
except Exception as e:
logger.error(f"Error occurred while processing combined request: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
logger.info("Starting PreCollege Data Scraper Server...")
uvicorn.run(app, host="0.0.0.0", port=7860) |