web_scrape / app.py
jonathanjordan21's picture
Update app.py
ae0b5d8 verified
raw
history blame
9 kB
from typing import Annotated, Optional
from fastapi import FastAPI, Header, Query
import html2text
import requests
import httpx
import re
import json
from fastapi.middleware.cors import CORSMiddleware
from bs4 import BeautifulSoup
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/linkedin_post_details")
async def linkedin_post_details(post_id: str, url: Optional[str] = None):
if not url:
url = "https://www.linkedin.com/posts/"+post_id
res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"})
soup = BeautifulSoup(res.content, "html.parser")
script_tags = soup.find_all("script")
for script_tag in script_tags:
try:
script_tag = json.loads(script_tag.string)
if script_tag.get("datePublished"):
desc = script_tag.get("articleBody")
if not desc:
desc = script_tag.get("description")
author = script_tag.get("author")
full_name = author.get("name")
username = author.get("url").rsplit("/", 1)[-1]
user_type = author.get("@type").lower()
date = script_tag.get("datePublished")
except Exception as e:
continue
spans = soup.find_all(
"span", {"data-test-id": "social-actions__reaction-count"}
)
if spans:
reactions = spans[0].text.strip()
else:
reactions = '0'
try:
comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get(
"data-num-comments"
))
except:
comments = '0'
return {
"insights": {
"likeCount": None,
# "commentCount": int(comments.replace(",", "")),
"commentCount": comments,
"shareCount": None,
# "reactionCount": int(reactions.replace(",", "")),
"reactionCount":reactions,
"reactions": [],
},
"description": desc,
"username": username,
"name": full_name,
"userType": user_type,
"date": date,
}
# async def linkedin_post_details(post_id: str):
# url = "https://www.linkedin.com/posts/"+post_id
# res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"})
# text_maker = html2text.HTML2Text()
# text_maker.ignore_links = True
# text_maker.ignore_images = True
# text_maker.bypass_tables = False
# docs = text_maker.handle(res.content.decode("utf-8"))
# chunks = docs.split("\n\n#")
# linkedin_content = chunks[1]
# user = linkedin_content.split("\n\n", 5)
# full_name = user[1]
# bio = user[2]
# try:
# date, edited = user[3].split(" ")
# edited = True
# except:
# date = user[3].strip()
# edited = False
# content = "\n\n".join(user[5:])
# insights = chunks[3].split("\n\n")[2]
# likes = insights.split(" ", 1)[0].strip()
# comments = insights.rsplit(" ", 2)[1].strip()
# username = url.rsplit("/",1)[-1].split("_")[0]
# return {
# "userDetails": {"full_name": full_name, "username":username,"bio": bio},
# "content": content,
# "date": date,
# "is_edited": edited,
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None},
# "username":username
# }
@app.get("/facebook_post_detail")
async def fb_post_detail(username: str, post_id: str, url: Optional[str] = None):
if not url:
url = f"https://www.facebook.com/{username}/posts/{post_id}"
user_agent = "Googlebot"
res = requests.get(
url,
headers={
"user-agent": user_agent,
"accept-language": "en-US"
},
timeout=(10, 27),
)
soup = BeautifulSoup(res.content, "html.parser")
script_tags = soup.find_all("script")
print(len(script_tags))
for script_tag in script_tags:
try:
if "important_reactors" in script_tag.string:
splitter = '"reaction_count":{"count":'
total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1)
total_react = total_react.split(',"')[0]
pattern = r"\[.*?\]"
reactions = re.search(pattern, reaction_split)
if reactions:
reactions = json.loads(reactions.group(0))
else:
reactions = []
reactions = [
dict(
name=reaction["node"]["localized_name"].lower(),
count=reaction["reaction_count"],
is_visible=reaction["visible_in_bling_bar"],
)
for reaction in reactions
]
splitter = '"share_count":{"count":'
shares = script_tag.string.split(splitter, 2)[1].split(",")[0]
splitter = '"comments":{"total_count":'
comments = script_tag.string.split(splitter, 2)[1].split("}")[0]
likes = [x.get("count") for x in reactions if x.get("name") == "like"][0]
print(total_react, reactions, shares, comments, likes)
if '"message":{"text":"' in script_tag.string:
desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0]
except Exception as e:
print(e)
continue
name = soup.find("meta", {"property": "og:title"}).get("content")
return {
"insights": {
"likeCount": likes,
"commentCount": comments,
"shareCount": shares,
"reactionCount": total_react,
"reactions": reactions,
},
"description": desc,
"username": username,
"name": name,
"date": None,
}
@app.get("/google_search")
async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None):
print(sites)
print(type(sites))
url = f"https://www.google.com/search?q={q} "
if sites:
url += " OR ".join(["site:"+site for site in sites])
texts = ""
soup = BeautifulSoup(requests.get(url).content, "html.parser")
for div in soup.find_all("div")[24:]:
if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed)
# print(div.get_text().strip())
href = div.find(href=True, recursive=True)
text = div.find(text=True, recursive=False)
if href and text:
print(text)
text = f'[{text}]({href["href"].split("/url?q=")[-1]})'
if text != None and text.strip():
texts += text + delimiter
return {"results":texts}
@app.get("/tiktok_video_details")
async def tiktok_video_details(username: str, video_id:str, url: Optional[str] = None):
if not url:
if username[0] != "@":
username = "@" + username
url = f"https://www.tiktok.com/{username}/video/{video_id}"
# user_agent = "LinkedInBot"
user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)"
res = requests.get(url, headers={"user-agent": user_agent})
# soup = BeautifulSoup(res.content, "html.parser")
# insights = soup.find("meta", {"property": "og:description"}).get("content")
# likes = insights.split(" ", 1)[0]
# desc = insights.rsplit(" comments. “", 1)[-1][:-1]
# comments = insights.split(", ", 1)[-1].split(" ", 1)[0]
# name = soup.find("meta", {"property": "og:title"}).get("content")[9:]
# return {
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None},
# "description": desc,
# "username": username,
# "name": name,
# }
text_maker = html2text.HTML2Text()
text_maker.ignore_links = True
text_maker.ignore_images = True
text_maker.bypass_tables = False
print("RESPONSE DETAIlL", res.content.decode("utf-8"))
docs = text_maker.handle(res.content.decode("utf-8"))
print("DOCS", docs)
content_detail = docs.split("###")[5]
likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail)
profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()]
username = profile[0]
date = profile[1].rsplit(" · ", 1)[-1]
desc = profile[-1][2:].replace("**", "")
return {
"insights":{
"likeCount":likes,
"commentCount":comments,
"bookmarkCount":bookmarks,
"shareCount":shares
},
"username":username,
"date":date,
"description":desc
}