import re import json import requests from .parser import ImdbParser from requests_html import HTMLSession from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class IMDB: """ A class to represent IMDB API. -------------- Main Methods of the IMDB API -------------- #1. search(name, year=None, tv=False, person=False) -- to search a query on IMDB #2. get_by_name(name, year=None, tv=False) -- to get a Movie/TV-Series info by it's name (pass year also to increase accuracy) #3. get_by_id(file_id) -- to get a Movie/TV-Series info by it's IMDB-ID (pass year also to increase accuracy) #4. person_by_name(name) -- to get a person's info by his/her name #5. person_by_id( p_id) -- to get a person's info by his/her IMDB-ID #6. upcoming(region=None) -- to get upcoming movies/TV-Series #7. popular_movies(genre=None, start_id=1, sort_by=None) -- to get IMDB popular movies #8. popular_tv(genre=None, start_id=1, sort_by=None) -- to get IMDB popular Tv-Series """ def __init__(self): self.session = HTMLSession() self.headers = { "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.105 Safari/537.36", "Referer": "https://www.imdb.com/" } self.baseURL = "https://www.imdb.com" self.search_results = {'result_count': 0, 'results': []} self.NA = json.dumps({"status": 404, "message": "No Result Found!", 'result_count': 0, 'results': []}) # ..................................method to search on IMDB........................................... def search(self, name, year=None, tv=False, person=False): """ @description:- Helps to search a query on IMDB. @parameter-1:- , query value to search. @parameter-2:- OPTIONAL, release year of query/movie/tv/file to search. @parameter-3:- OPTIONAL, to filter/limit/bound search results only for 'TV Series'. @parameter-4:- OPTIONAL, to filter search results only for person. @returns:- A JSON string: - {'result_count': , 'results': } """ assert isinstance(name, str) self.search_results = {'result_count': 0, 'results': []} name = name.replace(" ", "+") if year is None: url = f"https://www.imdb.com/find?q={name}" else: assert isinstance(year, int) url = f"https://www.imdb.com/find?q={name}+{year}" # print(url) try: response = self.session.get(url) except requests.exceptions.ConnectionError as e: response = self.session.get(url, verify=False) # results = response.html.xpath("//table[@class='findList']/tr") results = response.html.xpath("//section[@data-testid='find-results-section-title']/div/ul/li") # print(len(results)) if tv is True: results = [result for result in results if "TV" in result.text] if person is True: results = response.html.xpath("//section[@data-testid='find-results-section-name']/div/ul/li") results = [result for result in results if 'name' in result.find('a')[0].attrs['href']] # print(results) output = [] for result in results: name = result.text.replace('\n', ' ') url = result.find('a')[0].attrs['href'] if ('Podcast' not in name) and ('Music Video' not in name): try: image = result.xpath("//img")[0].attrs['src'] file_id = url.split('/')[2] output.append({ 'id': file_id, "name": name, "url": f"https://www.imdb.com{url}", "poster": image }) except IndexError: pass self.search_results = {'result_count': len(output), 'results': output} return json.dumps(self.search_results, indent=2) # ..............................methods to get a movie/web-series/tv info.............................. def get(self, url): """ @description:- helps to get a file's complete info (used by get_by_name() & get_by_id() ) @parameter:- , url of the file/movie/tv-series. @returns:- File/movie/TV info as JSON string. """ try: response = self.session.get(url) result = response.html.xpath("//script[@type='application/ld+json']")[0].text result = ''.join(result.splitlines()) # removing newlines result = f"""{result}""" # print(result) except IndexError: return self.NA try: # converting json string into dict result = json.loads(result) except json.decoder.JSONDecodeError as e: # sometimes json is invalid as 'description' contains inverted commas or other html escape chars try: to_parse = ImdbParser(result) # removing trailer & description schema from json string parsed = to_parse.remove_trailer parsed = to_parse.remove_description # print(parsed) result = json.loads(parsed) except json.decoder.JSONDecodeError as e: try: # removing reviewBody from json string parsed = to_parse.remove_review_body result = json.loads(parsed) except json.decoder.JSONDecodeError as e: # invalid char(s) is/are not in description/trailer/reviewBody schema return self.NA output = { "type": result.get('@type'), "name": result.get('name'), "url": self.baseURL + result.get('url').split("/title")[-1], "poster": result.get('image'), "description": result.get('description'), "review": { "author": result.get("review", {'author': {'name': None}}).get('author').get('name'), "dateCreated": result.get("review", {"dateCreated": None}).get("dateCreated"), "inLanguage": result.get("review", {"inLanguage": None}).get("inLanguage"), "heading": result.get("review", {"name": None}).get("name"), "reviewBody": result.get("review", {"reviewBody": None}).get("reviewBody"), "reviewRating": { "worstRating": result.get("review", {"reviewRating": {"worstRating": None}}) .get("reviewRating",{"worstRating": None}).get("worstRating"), "bestRating": result.get("review", {"reviewRating": {"bestRating": None}}) .get("reviewRating",{"bestRating": None}).get("bestRating"), "ratingValue": result.get("review", {"reviewRating": {"ratingValue": None}}) .get("reviewRating",{"ratingValue": None}).get("ratingValue"), }, }, "rating": { "ratingCount": result.get("aggregateRating", {"ratingCount": None}).get("ratingCount"), "bestRating": result.get("aggregateRating", {"bestRating": None}).get("bestRating"), "worstRating": result.get("aggregateRating", {"worstRating": None}).get("worstRating"), "ratingValue": result.get("aggregateRating", {"ratingValue": None}).get("ratingValue"), }, "contentRating": result.get("contentRating"), "genre": result.get("genre"), "datePublished": result.get("datePublished"), "keywords": result.get("keywords"), "duration": result.get("duration"), "actor": [ {"name": actor.get("name"), "url": actor.get("url")} for actor in result.get("actor", []) ], "director": [ {"name": director.get("name"), "url": director.get("url")} for director in result.get("director", []) ], "creator": [ {"name": creator.get("name"), "url": creator.get("url")} for creator in result.get("creator", []) if creator.get('@type') == 'Person' ] } return json.dumps(output, indent=2) def get_by_name(self, name, year=None, tv=False): """ @description:- Helps to search a file/movie/tv by name. @parameter-1:- , query/name to search. @parameter-2:- OPTIONAL, release year of query/movie/tv/file to search. @parameter-3:- OPTIONAL, to filter/limit/bound search result only for 'TV Series'. @returns:- File/movie/TV info as JSON string. """ results = json.loads(self.search(name, year=year)) all_results = [i for i in self.search_results['results'] if 'title' in i['url']] # print(all_results) # filtering TV and movies if tv is True: # for tv/Web-Series only tv_only = [result for result in all_results if "TV" in result['name']] if year is not None: tv_only = [result for result in tv_only if str(year) in result['name']] # double checking by file name if bool(tv_only): tv_only_checked = [result for result in tv_only if result['name'].lower().startswith(name.split(" ")[0].lower())] tv_only = tv_only_checked if bool(tv_only_checked) else tv_only results['results'] = tv_only if bool(tv_only) else all_results else: # for movies only movie_only = [result for result in all_results if "TV" not in result['name']] if year is not None: movie_only = [result for result in movie_only if str(year) in result['name']] # double checking by file name if bool(movie_only): movie_only_checked = [result for result in movie_only if result['name'].lower().startswith(name.split(" ")[0].lower())] movie_only = movie_only_checked if bool(movie_only_checked) else movie_only results['results'] = movie_only if bool(movie_only) else all_results # print(results['results']) if len(results['results']) > 0: return self.get(results['results'][0].get('url')) else: return self.NA def get_by_id(self, file_id): """ @description:- Helps to search a file/movie/tv by its imdb ID. @parameter-1:- , imdb ID of the file/movie/tv. @returns:- File/movie/TV info as JSON string. """ assert isinstance(file_id, str) url = f"{self.baseURL}/title/{file_id}" return self.get(url) # ........................................Methods for person profile................................... def get_person(self, url): """ @description:- Helps to search a person info by its url, (used by person_by_name() & person_by_id() ). @parameter-1:- , url of the person's profile page. @returns:- Person's info as JSON string. """ try: response = self.session.get(url) result = response.html.xpath("//script[@type='application/ld+json']")[0].text result = f"""{result}""" result = json.loads(result) except json.decoder.JSONDecodeError as e: return self.NA del result["@context"] result['type'] = result.get('@type') del result["@type"] return json.dumps(result, indent=2) def person_by_name(self, name): """ @description:- Helps to search a person info by its name. @parameter-1:- , name of the person. @returns:- Person's info as JSON string. """ results = json.loads(self.search(name, person=True)) # print(results) url = results['results'][0].get('url') return self.get_person(url) def person_by_id(self, p_id): """ @description:- Helps to search a person info by its imdb ID. @parameter-1:- , imdb ID of the person's profile. @returns:- Person's info as JSON string. """ assert isinstance(p_id, str) url = f"{self.baseURL}/name/{p_id}" return self.get_person(url) # .........................................For Upcoming Movies......................................... def upcoming(self, region=None): """ @description:- Helps to get upcoming movies/tv-series. @parameter-1:- OPTIONAL, country code (like US, IN etc.) to filter results by region/country. @returns:- upcoming movies/TV-Series info as JSON string. """ if region is not None: assert isinstance(region, str) url = f"https://www.imdb.com/calendar?region={region}" else: url = "https://www.imdb.com/calendar" try: response = self.session.get(url) except requests.exceptions.ConnectionError as e: response = self.session.get(url, verify=False) output = [] div = response.html.xpath("//main")[0] # movies are divided/enlisted within article tag articles = div.find('article') for article in articles: h3 = article.find('h3')[0] ul = article.xpath('//ul')[0].xpath('//li') for li in ul: try: movie = li.find('a')[0] poster = ul[0].find('img')[0].attrs.get('src') output.append({ 'id': movie.attrs['href'].split('/')[2], 'name': movie.text, 'url': self.baseURL + movie.attrs['href'], 'release_data': h3.text, 'poster': poster.split(',')[0] }) except IndexError: pass results = {'result_count': len(output), 'results': output} if results['result_count'] > 0: return json.dumps(results, indent=2) else: return self.NA # ............................................For Popular Movies....................................... def get_popular(self, url): """ @description:- Helps to search popular movies/TV-Series by url, (used by popular_movies() & popular_tv() ). @parameter-1:- , url to search. @returns:- Files/Movies/TV-Series info as JSON string. """ assert isinstance(url, str) try: response = self.session.get(url) except requests.exceptions.ConnectionError as e: response = self.session.get(url, verify=False) all_li = response.html.xpath('//ul[@role="presentation"]/li') output = [] # for link, year in zip(links, years): for li in all_li: for obj in li.find('a'): if ("title" in obj.attrs.get('href')) and (". " in obj.text): href = obj.attrs.get('href') name = obj.text.split(". ")[-1] break # getting year for span in li.find('span'): if len(span.text.strip()) == 4: try: year = int(span.text.strip()) break except: year = "N/A" # getting poster try: file_id = href.split('/')[2] poster = li.xpath("//img[@loading='lazy']") poster = poster[0].attrs.get('src') poster = poster if bool(poster) else 'image_not_found' except: poster = 'image_not_found' # creating file object output.append({ 'id': file_id, 'name': name, 'year': year, 'url': self.baseURL + href, 'poster': poster }) self.search_results = {'result_count': len(output), 'results': output} return json.dumps(self.search_results, indent=2) def popular_movies(self, genre=None, start_id=1, sort_by=None): """ @description:- Helps to get 50 popular movies starting from . @parameter-1:- OPTIONAL, to filter results by genre. @parameter-2:- DEFAULT=1, start id to show results (shows results from start_id to start_id+50). @parameter-3:- OPTIONAL, to sort results (eg. sort=user_rating,desc OR sort=user_rating,asc). - (visit 'https://www.imdb.com/search/title/?title_type=movie' for more info) @returns:- Popular Movies (by genre) info as JSON string. """ assert isinstance(start_id, int) if genre is not None: assert isinstance(genre, str) url = f"https://www.imdb.com/search/title/?title_type=movie&genres={genre}&start={start_id}&sort={sort_by}" else: url = f"https://www.imdb.com/search/title/?title_type=movie&start={start_id}&sort={sort_by}" return self.get_popular(url) def popular_tv(self, genre=None, start_id=1, sort_by=None): """ @description:- Helps to get 50 popular TV-Series starting from . @parameter-1:- OPTIONAL, to filter results by genre. @parameter-2:- DEFAULT=1, start id to show results (shows results from start_id to start_id+50). @parameter-3:- OPTIONAL, to sort results (eg. sort=user_rating,desc OR sort=user_rating,asc). - (visit 'https://www.imdb.com/search/title/?title_type=movie' for more info) @returns:- Popular TV-Series info as JSON string. """ assert isinstance(start_id, int) if genre is not None: assert isinstance(genre, str) url = f"https://www.imdb.com/search/title/?title_type=tv_series,tv_miniseries&genres={genre}&start={start_id}&sort={sort_by}" else: url = f"https://www.imdb.com/search/title/?title_type=tv_series,tv_miniseries&start={start_id}&sort={sort_by}" return self.get_popular(url)