Spaces:
Sleeping
Sleeping
| from typing import Annotated, Optional | |
| from fastapi import FastAPI, Header, Query | |
| import html2text | |
| import requests | |
| import httpx | |
| import re | |
| import json | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from bs4 import BeautifulSoup | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def linkedin_post_details(post_id: str): | |
| url = "https://www.linkedin.com/posts/"+post_id | |
| res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) | |
| soup = BeautifulSoup(res.content, "html.parser") | |
| script_tags = soup.find_all("script") | |
| for script_tag in script_tags: | |
| try: | |
| script_tag = json.loads(script_tag.string) | |
| if script_tag.get("datePublished"): | |
| desc = script_tag.get("articleBody") | |
| if not desc: | |
| desc = script_tag.get("description") | |
| author = script_tag.get("author") | |
| full_name = author.get("name") | |
| username = author.get("url").rsplit("/", 1)[-1] | |
| user_type = author.get("@type").lower() | |
| date = script_tag.get("datePublished") | |
| except Exception as e: | |
| continue | |
| spans = soup.find_all( | |
| "span", {"data-test-id": "social-actions__reaction-count"} | |
| ) | |
| if spans: | |
| reactions = spans[0].text.strip() | |
| else: | |
| reactions = 0 | |
| try: | |
| comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get( | |
| "data-num-comments" | |
| )) | |
| except: | |
| comments = 0 | |
| return { | |
| "insights": { | |
| "likeCount": None, | |
| "commentCount": int(comments.replace(",", "")), | |
| "shareCount": None, | |
| "reactionCount": int(reactions.replace(",", "")), | |
| "reactions": [], | |
| }, | |
| "description": desc, | |
| "username": username, | |
| "name": full_name, | |
| "userType": user_type, | |
| "date": date, | |
| } | |
| # async def linkedin_post_details(post_id: str): | |
| # url = "https://www.linkedin.com/posts/"+post_id | |
| # res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) | |
| # text_maker = html2text.HTML2Text() | |
| # text_maker.ignore_links = True | |
| # text_maker.ignore_images = True | |
| # text_maker.bypass_tables = False | |
| # docs = text_maker.handle(res.content.decode("utf-8")) | |
| # chunks = docs.split("\n\n#") | |
| # linkedin_content = chunks[1] | |
| # user = linkedin_content.split("\n\n", 5) | |
| # full_name = user[1] | |
| # bio = user[2] | |
| # try: | |
| # date, edited = user[3].split(" ") | |
| # edited = True | |
| # except: | |
| # date = user[3].strip() | |
| # edited = False | |
| # content = "\n\n".join(user[5:]) | |
| # insights = chunks[3].split("\n\n")[2] | |
| # likes = insights.split(" ", 1)[0].strip() | |
| # comments = insights.rsplit(" ", 2)[1].strip() | |
| # username = url.rsplit("/",1)[-1].split("_")[0] | |
| # return { | |
| # "userDetails": {"full_name": full_name, "username":username,"bio": bio}, | |
| # "content": content, | |
| # "date": date, | |
| # "is_edited": edited, | |
| # "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None}, | |
| # "username":username | |
| # } | |
| async def fb_post_detail(username: str, post_id: str): | |
| url = f"https://www.facebook.com/{username}/posts/{post_id}" | |
| user_agent = "Googlebot" | |
| res = requests.get( | |
| url, | |
| headers={ | |
| "user-agent": user_agent, | |
| "accept-language": "en-US" | |
| }, | |
| timeout=(10, 27), | |
| ) | |
| soup = BeautifulSoup(res.content, "html.parser") | |
| script_tags = soup.find_all("script") | |
| print(len(script_tags)) | |
| for script_tag in script_tags: | |
| try: | |
| if "important_reactors" in script_tag.string: | |
| splitter = '"reaction_count":{"count":' | |
| total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1) | |
| total_react = total_react.split(',"')[0] | |
| pattern = r"\[.*?\]" | |
| reactions = re.search(pattern, reaction_split) | |
| if reactions: | |
| reactions = json.loads(reactions.group(0)) | |
| else: | |
| reactions = [] | |
| reactions = [ | |
| dict( | |
| name=reaction["node"]["localized_name"].lower(), | |
| count=reaction["reaction_count"], | |
| is_visible=reaction["visible_in_bling_bar"], | |
| ) | |
| for reaction in reactions | |
| ] | |
| splitter = '"share_count":{"count":' | |
| shares = script_tag.string.split(splitter, 2)[1].split(",")[0] | |
| splitter = '"comments":{"total_count":' | |
| comments = script_tag.string.split(splitter, 2)[1].split("}")[0] | |
| likes = [x.get("count") for x in reactions if x.get("name") == "like"][0] | |
| print(total_react, reactions, shares, comments, likes) | |
| if '"message":{"text":"' in script_tag.string: | |
| desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0] | |
| except Exception as e: | |
| print(e) | |
| continue | |
| name = soup.find("meta", {"property": "og:title"}).get("content") | |
| return { | |
| "insights": { | |
| "likeCount": likes, | |
| "commentCount": comments, | |
| "shareCount": shares, | |
| "reactionCount": total_react, | |
| "reactions": reactions, | |
| }, | |
| "description": desc, | |
| "username": username, | |
| "name": name, | |
| "date": None, | |
| } | |
| async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None): | |
| print(sites) | |
| print(type(sites)) | |
| url = f"https://www.google.com/search?q={q} " | |
| if sites: | |
| url += " OR ".join(["site:"+site for site in sites]) | |
| texts = "" | |
| soup = BeautifulSoup(requests.get(url).content, "html.parser") | |
| for div in soup.find_all("div")[24:]: | |
| if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed) | |
| # print(div.get_text().strip()) | |
| href = div.find(href=True, recursive=True) | |
| text = div.find(text=True, recursive=False) | |
| if href and text: | |
| print(text) | |
| text = f'[{text}]({href["href"].split("/url?q=")[-1]})' | |
| if text != None and text.strip(): | |
| texts += text + delimiter | |
| return {"results":texts} | |
| async def tiktok_video_details(username: str, video_id:str): | |
| if username[0] != "@": | |
| username = "@" + username | |
| url = f"https://www.tiktok.com/{username}/video/{video_id}" | |
| # user_agent = "LinkedInBot" | |
| user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)" | |
| res = requests.get(url, headers={"user-agent": user_agent}) | |
| # soup = BeautifulSoup(res.content, "html.parser") | |
| # insights = soup.find("meta", {"property": "og:description"}).get("content") | |
| # likes = insights.split(" ", 1)[0] | |
| # desc = insights.rsplit(" comments. “", 1)[-1][:-1] | |
| # comments = insights.split(", ", 1)[-1].split(" ", 1)[0] | |
| # name = soup.find("meta", {"property": "og:title"}).get("content")[9:] | |
| # return { | |
| # "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None}, | |
| # "description": desc, | |
| # "username": username, | |
| # "name": name, | |
| # } | |
| text_maker = html2text.HTML2Text() | |
| text_maker.ignore_links = True | |
| text_maker.ignore_images = True | |
| text_maker.bypass_tables = False | |
| print("RESPONSE DETAIlL", res.content.decode("utf-8")) | |
| docs = text_maker.handle(res.content.decode("utf-8")) | |
| print("DOCS", docs) | |
| content_detail = docs.split("###")[5] | |
| likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail) | |
| profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()] | |
| username = profile[0] | |
| date = profile[1].rsplit(" · ", 1)[-1] | |
| desc = profile[-1][2:].replace("**", "") | |
| return { | |
| "insights":{ | |
| "likeCount":likes, | |
| "commentCount":comments, | |
| "bookmarkCount":bookmarks, | |
| "shareCount":shares | |
| }, | |
| "username":username, | |
| "date":date, | |
| "description":desc | |
| } |