Spaces:
Paused
Paused
| import os | |
| import re | |
| import time | |
| from typing import List | |
| import requests | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.expected_conditions import \ | |
| presence_of_element_located | |
| from selenium.webdriver.support.wait import WebDriverWait | |
| from Powers import * | |
| class SCRAP_DATA: | |
| """Class to get and handel scrapped data""" | |
| def __init__(self, urls: List[str] or str) -> None: | |
| self.urls = urls | |
| self.path = "./scrapped/" | |
| if not os.path.isdir(self.path): | |
| os.makedirs("./scrapped/") | |
| def get_images(self) -> list: | |
| images = [] | |
| if isinstance(self.urls, str): | |
| requested = requests.get(self.urls) | |
| try: | |
| name = self.path + f"img_{time.time()}.jpg" | |
| with open(name, "wb") as f: | |
| f.write(requested.content) | |
| images.append(name) | |
| except Exception as e: | |
| LOGGER.error(e) | |
| requested.close() | |
| else: | |
| for i in self.urls: | |
| if i: | |
| requested = requests.get(i) | |
| else: | |
| continue | |
| try: | |
| name = self.path + f"img_{time.time()}.jpg" | |
| with open(name, "wb") as f: | |
| f.write(requested.content) | |
| images.append(name) | |
| except Exception as e: | |
| LOGGER.error(e) | |
| requested.close() | |
| continue | |
| return images | |
| def get_videos(self) -> list: | |
| videos = [] | |
| if isinstance(self.urls, str): | |
| if i: | |
| requested = requests.get(i) | |
| else: | |
| return [] | |
| try: | |
| name = self.path + f"vid_{time.time()}.mp4" | |
| with open(name, "wb") as f: | |
| f.write(requested.content) | |
| videos.append(name) | |
| except Exception as e: | |
| LOGGER.error(e) | |
| requested.close() | |
| else: | |
| for i in self.urls: | |
| if i: | |
| requested = requests.get(i) | |
| else: | |
| continue | |
| try: | |
| name = self.path + f"vid_{time.time()}.mp4" | |
| with open(name, "wb") as f: | |
| f.write(requested.content) | |
| videos.append(name) | |
| except Exception as e: | |
| LOGGER.error(e) | |
| requested.close() | |
| continue | |
| return videos | |
| class DRIVER: | |
| """Class to make selenium driver""" | |
| def __init__(self) -> None: | |
| self.BIN = CHROME_BIN | |
| self.CHROME_DRIVER = CHROME_DRIVER | |
| def initialize_driver(self): | |
| if not self.BIN: | |
| LOGGER.error( | |
| "ChromeBinaryErr: No binary path found! Install Chromium or Google Chrome.") | |
| return ( | |
| None, | |
| "ChromeBinaryErr: No binary path found! Install Chromium or Google Chrome.", | |
| ) | |
| try: | |
| options = Options() | |
| options.binary_location = self.BIN | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument("--ignore-certificate-errors") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--headless=new") | |
| options.add_argument("--test-type") | |
| options.add_argument("--no-sandbox") | |
| service = Service(self.CHROME_DRIVER) | |
| driver = webdriver.Chrome(options, service) | |
| return driver, None | |
| except Exception as e: | |
| LOGGER.error(f"ChromeDriverErr: {e}") | |
| return None, f"ChromeDriverErr: {e}" | |
| def driver_close(self, driver: webdriver.Chrome): | |
| driver.close() | |
| driver.quit() | |
| class INSTAGRAM(DRIVER): | |
| """Class to scrap data from instagram""" | |
| def __init__(self, url: str) -> None: | |
| self.url = url | |
| self.article = "article._aa6a" | |
| self.ul_class = "_acay" | |
| self.image_class = "x5yr21d" | |
| self.video_class = "x1lliihq" | |
| self.next_button = "button._afxw" | |
| self.return_dict = {"image": [], "video": []} | |
| super().__init__() | |
| def is_correct_link(self): | |
| return bool((re.compile(r"^https?://(?:www\.)?instagram\.com/")).match(self.url)) | |
| def get_all(self): | |
| driver, error = self.initialize_driver() | |
| if not driver: | |
| return error | |
| driver.get(self.url) | |
| wait = WebDriverWait(driver, 30) | |
| if "reel" in self.url: | |
| element = wait.until( | |
| presence_of_element_located((By.TAG_NAME, "video"))) | |
| reels = element.get_attribute("src") | |
| self.driver_close(driver) | |
| self.return_dict.get("video").append(reels) | |
| return self.return_dict | |
| elif bool((re.compile(r"^https?://(?:www\.)?instagram\.com/p/")).match(self.url)): | |
| image_links = [] | |
| video_links = [] | |
| try: | |
| element = wait.until(presence_of_element_located( | |
| (By.CLASS_NAME, self.ul_class))) | |
| while True: | |
| sub_element = element.find_elements( | |
| By.CLASS_NAME, self.image_class) | |
| for i in sub_element: | |
| url = i.get_attribute("src") | |
| image_links.append(url) | |
| sub_element = element.find_elements( | |
| By.CLASS_NAME, self.video_class) | |
| for i in sub_element: | |
| url = i.get_attribute("src") | |
| video_links.append(url) | |
| try: | |
| driver.find_element( | |
| By.CSS_SELECTOR, self.next_button).click() | |
| except: # Failed to either find the element or click on next i.e. no more media left in post | |
| break | |
| except: | |
| element = wait.until(presence_of_element_located( | |
| (By.CSS_SELECTOR, self.article))) | |
| try: | |
| sub_element = element.find_element(By.TAG_NAME, "img") | |
| image_links.append(sub_element.get_attribute("src")) | |
| except: | |
| sub_element = element.find_element(By.TAG_NAME, "video") | |
| video_links.append(sub_element.get_attribute("src")) | |
| self.driver_close(driver) | |
| # To remove duplicates here I am converting into set | |
| if image_links: | |
| image_links = list(set(image_links)) | |
| if video_links: | |
| video_links = list(set(video_links)) | |
| for i in video_links: | |
| image_links.remove(i) | |
| self.return_dict.get("image").extend(image_links) | |
| self.return_dict.get("video").extend(video_links) | |
| return self.return_dict | |
| else: | |
| return {} | |