web_scrape / app.py
jonathanjordan21's picture
Update app.py
1ecee5c verified
raw
history blame
4.03 kB
from typing import Annotated, Optional
from fastapi import FastAPI, Header, Query
import html2text
import requests
import httpx
from fastapi.middleware.cors import CORSMiddleware
from bs4 import BeautifulSoup
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/linkedin_post_details")
async def linkedin_post_details(post_id: str):
url = "https://www.linkedin.com/posts/"+post_id
res = requests.get(url)
text_maker = html2text.HTML2Text()
text_maker.ignore_links = True
text_maker.ignore_images = True
text_maker.bypass_tables = False
docs = text_maker.handle(res.content.decode("utf-8"))
chunks = docs.split("\n\n#")
linkedin_content = chunks[1]
user = linkedin_content.split("\n\n", 5)
full_name = user[1]
bio = user[2]
try:
date, edited = user[3].split(" ")
edited = True
except:
date = user[3].strip()
edited = False
content = "\n\n".join(user[5:])
insights = chunks[3].split("\n\n")[2]
likes = insights.split(" ", 1)[0].strip()
comments = insights.rsplit(" ", 2)[1].strip()
return {
"user": {"name": full_name, "bio": bio},
"content": content,
"date": date,
"is_edited": edited,
"insights": {"likeCount": likes, "commentCount": comments, "shareCount": None},
}
@app.get("/google_search")
async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None):
print(sites)
print(type(sites))
url = f"https://www.google.com/search?q={q} "
if sites:
url += " OR ".join(["site:"+site for site in sites])
texts = ""
soup = BeautifulSoup(requests.get(url).content, "html.parser")
for div in soup.find_all("div")[24:]:
if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed)
# print(div.get_text().strip())
href = div.find(href=True, recursive=True)
text = div.find(text=True, recursive=False)
if href and text:
print(text)
text = f'[{text}]({href["href"].split("/url?q=")[-1]})'
if text != None and text.strip():
texts += text + delimiter
return {"results":texts}
@app.get("/tiktok_video_details")
async def read_item(username: str, video_id:str):
user_agent = "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36"
# user_agent = "Googlebot/2.1"
# if "https:" in link_detail:
# url = link_detail
# elif link_detail[0] == "/":
# url = "https://tiktok.com" + link_detail
# else:
# url = "https://tiktok.com/"+link_detail
url = f"https://www.tiktok.com/@{username}/video/{video_id}"
with httpx.Client() as client:
res = client.get(url, headers={"User-Agent":user_agent})
# res = requests.get(url, headers={"user-agent":user_agent})
text_maker = html2text.HTML2Text()
text_maker.ignore_links = True
text_maker.ignore_images = True
text_maker.bypass_tables = False
print("RESPONSE DETAIlL", res.content.decode("utf-8"))
docs = text_maker.handle(res.content.decode("utf-8"))
print("DOCS", docs)
content_detail = docs.split("###")[5]
likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail)
profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()]
username = profile[0]
date = profile[1].rsplit(" · ", 1)[-1]
desc = profile[-1].replace("**", "")
return {
"insights":{
"likeCount":likes,
"commentCount":comments,
"bookmarkCount":bookmarks,
"shareCount":shares
},
"username":username,
"date":date,
"description":desc
}