BinaryONe
Callback Update
cbcec65
raw
history blame
19 kB
import re
import json
import requests
from .parser import ImdbParser
from requests_html import HTMLSession
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class IMDB:
"""
A class to represent IMDB API.
--------------
Main Methods of the IMDB API
--------------
#1. search(name, year=None, tv=False, person=False)
-- to search a query on IMDB
#2. get_by_name(name, year=None, tv=False)
-- to get a Movie/TV-Series info by it's name (pass year also to increase accuracy)
#3. get_by_id(file_id)
-- to get a Movie/TV-Series info by it's IMDB-ID (pass year also to increase accuracy)
#4. person_by_name(name)
-- to get a person's info by his/her name
#5. person_by_id( p_id)
-- to get a person's info by his/her IMDB-ID
#6. upcoming(region=None)
-- to get upcoming movies/TV-Series
#7. popular_movies(genre=None, start_id=1, sort_by=None)
-- to get IMDB popular movies
#8. popular_tv(genre=None, start_id=1, sort_by=None)
-- to get IMDB popular Tv-Series
"""
def __init__(self):
self.session = HTMLSession()
self.headers = {
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.105 Safari/537.36",
"Referer": "https://www.imdb.com/"
}
self.baseURL = "https://www.imdb.com"
self.search_results = {'result_count': 0, 'results': []}
self.NA = json.dumps({"status": 404, "message": "No Result Found!", 'result_count': 0, 'results': []})
# ..................................method to search on IMDB...........................................
def search(self, name, year=None, tv=False, person=False):
"""
@description:- Helps to search a query on IMDB.
@parameter-1:- <str:name>, query value to search.
@parameter-2:- <int:year> OPTIONAL, release year of query/movie/tv/file to search.
@parameter-3:- <bool:tv> OPTIONAL, to filter/limit/bound search results only for 'TV Series'.
@parameter-4:- <bool:person> OPTIONAL, to filter search results only for person.
@returns:- A JSON string:
- {'result_count': <int:total_search_results>, 'results': <list:list_of_files/movie_info_dict>}
"""
assert isinstance(name, str)
self.search_results = {'result_count': 0, 'results': []}
name = name.replace(" ", "+")
if year is None:
url = f"https://www.imdb.com/find?q={name}"
else:
assert isinstance(year, int)
url = f"https://www.imdb.com/find?q={name}+{year}"
# print(url)
try:
response = self.session.get(url)
except requests.exceptions.ConnectionError as e:
response = self.session.get(url, verify=False)
# results = response.html.xpath("//table[@class='findList']/tr")
results = response.html.xpath("//section[@data-testid='find-results-section-title']/div/ul/li")
# print(len(results))
if tv is True:
results = [result for result in results if "TV" in result.text]
if person is True:
results = response.html.xpath("//section[@data-testid='find-results-section-name']/div/ul/li")
results = [result for result in results if 'name' in result.find('a')[0].attrs['href']]
# print(results)
output = []
for result in results:
name = result.text.replace('\n', ' ')
url = result.find('a')[0].attrs['href']
if ('Podcast' not in name) and ('Music Video' not in name):
try:
image = result.xpath("//img")[0].attrs['src']
file_id = url.split('/')[2]
output.append({
'id': file_id,
"name": name,
"url": f"https://www.imdb.com{url}",
"poster": image
})
except IndexError:
pass
self.search_results = {'result_count': len(output), 'results': output}
return json.dumps(self.search_results, indent=2)
# ..............................methods to get a movie/web-series/tv info..............................
def get(self, url):
"""
@description:- helps to get a file's complete info (used by get_by_name() & get_by_id() )
@parameter:- <str:url>, url of the file/movie/tv-series.
@returns:- File/movie/TV info as JSON string.
"""
try:
response = self.session.get(url)
result = response.html.xpath("//script[@type='application/ld+json']")[0].text
result = ''.join(result.splitlines()) # removing newlines
result = f"""{result}"""
# print(result)
except IndexError:
return self.NA
try:
# converting json string into dict
result = json.loads(result)
except json.decoder.JSONDecodeError as e:
# sometimes json is invalid as 'description' contains inverted commas or other html escape chars
try:
to_parse = ImdbParser(result)
# removing trailer & description schema from json string
parsed = to_parse.remove_trailer
parsed = to_parse.remove_description
# print(parsed)
result = json.loads(parsed)
except json.decoder.JSONDecodeError as e:
try:
# removing reviewBody from json string
parsed = to_parse.remove_review_body
result = json.loads(parsed)
except json.decoder.JSONDecodeError as e:
# invalid char(s) is/are not in description/trailer/reviewBody schema
return self.NA
output = {
"type": result.get('@type'),
"name": result.get('name'),
"url": self.baseURL + result.get('url').split("/title")[-1],
"poster": result.get('image'),
"description": result.get('description'),
"review": {
"author": result.get("review", {'author': {'name': None}}).get('author').get('name'),
"dateCreated": result.get("review", {"dateCreated": None}).get("dateCreated"),
"inLanguage": result.get("review", {"inLanguage": None}).get("inLanguage"),
"heading": result.get("review", {"name": None}).get("name"),
"reviewBody": result.get("review", {"reviewBody": None}).get("reviewBody"),
"reviewRating": {
"worstRating": result.get("review", {"reviewRating": {"worstRating": None}})
.get("reviewRating",{"worstRating": None}).get("worstRating"),
"bestRating": result.get("review", {"reviewRating": {"bestRating": None}})
.get("reviewRating",{"bestRating": None}).get("bestRating"),
"ratingValue": result.get("review", {"reviewRating": {"ratingValue": None}})
.get("reviewRating",{"ratingValue": None}).get("ratingValue"),
},
},
"rating": {
"ratingCount": result.get("aggregateRating", {"ratingCount": None}).get("ratingCount"),
"bestRating": result.get("aggregateRating", {"bestRating": None}).get("bestRating"),
"worstRating": result.get("aggregateRating", {"worstRating": None}).get("worstRating"),
"ratingValue": result.get("aggregateRating", {"ratingValue": None}).get("ratingValue"),
},
"contentRating": result.get("contentRating"),
"genre": result.get("genre"),
"datePublished": result.get("datePublished"),
"keywords": result.get("keywords"),
"duration": result.get("duration"),
"actor": [
{"name": actor.get("name"), "url": actor.get("url")} for actor in result.get("actor", [])
],
"director": [
{"name": director.get("name"), "url": director.get("url")} for director in result.get("director", [])
],
"creator": [
{"name": creator.get("name"), "url": creator.get("url")} for creator in result.get("creator", [])
if creator.get('@type') == 'Person'
]
}
return json.dumps(output, indent=2)
def get_by_name(self, name, year=None, tv=False):
"""
@description:- Helps to search a file/movie/tv by name.
@parameter-1:- <str:name>, query/name to search.
@parameter-2:- <int:year> OPTIONAL, release year of query/movie/tv/file to search.
@parameter-3:- <bool:tv> OPTIONAL, to filter/limit/bound search result only for 'TV Series'.
@returns:- File/movie/TV info as JSON string.
"""
results = json.loads(self.search(name, year=year))
all_results = [i for i in self.search_results['results'] if 'title' in i['url']]
# print(all_results)
# filtering TV and movies
if tv is True: # for tv/Web-Series only
tv_only = [result for result in all_results if "TV" in result['name']]
if year is not None:
tv_only = [result for result in tv_only if str(year) in result['name']]
# double checking by file name
if bool(tv_only):
tv_only_checked = [result for result in tv_only if result['name'].lower().startswith(name.split(" ")[0].lower())]
tv_only = tv_only_checked if bool(tv_only_checked) else tv_only
results['results'] = tv_only if bool(tv_only) else all_results
else: # for movies only
movie_only = [result for result in all_results if "TV" not in result['name']]
if year is not None:
movie_only = [result for result in movie_only if str(year) in result['name']]
# double checking by file name
if bool(movie_only):
movie_only_checked = [result for result in movie_only if result['name'].lower().startswith(name.split(" ")[0].lower())]
movie_only = movie_only_checked if bool(movie_only_checked) else movie_only
results['results'] = movie_only if bool(movie_only) else all_results
# print(results['results'])
if len(results['results']) > 0:
return self.get(results['results'][0].get('url'))
else:
return self.NA
def get_by_id(self, file_id):
"""
@description:- Helps to search a file/movie/tv by its imdb ID.
@parameter-1:- <str:file_id>, imdb ID of the file/movie/tv.
@returns:- File/movie/TV info as JSON string.
"""
assert isinstance(file_id, str)
url = f"{self.baseURL}/title/{file_id}"
return self.get(url)
# ........................................Methods for person profile...................................
def get_person(self, url):
"""
@description:- Helps to search a person info by its url, (used by person_by_name() & person_by_id() ).
@parameter-1:- <str:url>, url of the person's profile page.
@returns:- Person's info as JSON string.
"""
try:
response = self.session.get(url)
result = response.html.xpath("//script[@type='application/ld+json']")[0].text
result = f"""{result}"""
result = json.loads(result)
except json.decoder.JSONDecodeError as e:
return self.NA
del result["@context"]
result['type'] = result.get('@type')
del result["@type"]
return json.dumps(result, indent=2)
def person_by_name(self, name):
"""
@description:- Helps to search a person info by its name.
@parameter-1:- <str:name>, name of the person.
@returns:- Person's info as JSON string.
"""
results = json.loads(self.search(name, person=True))
# print(results)
url = results['results'][0].get('url')
return self.get_person(url)
def person_by_id(self, p_id):
"""
@description:- Helps to search a person info by its imdb ID.
@parameter-1:- <str:p_id>, imdb ID of the person's profile.
@returns:- Person's info as JSON string.
"""
assert isinstance(p_id, str)
url = f"{self.baseURL}/name/{p_id}"
return self.get_person(url)
# .........................................For Upcoming Movies.........................................
def upcoming(self, region=None):
"""
@description:- Helps to get upcoming movies/tv-series.
@parameter-1:- <str:region> OPTIONAL, country code (like US, IN etc.) to filter results by region/country.
@returns:- upcoming movies/TV-Series info as JSON string.
"""
if region is not None:
assert isinstance(region, str)
url = f"https://www.imdb.com/calendar?region={region}"
else:
url = "https://www.imdb.com/calendar"
try:
response = self.session.get(url)
except requests.exceptions.ConnectionError as e:
response = self.session.get(url, verify=False)
output = []
div = response.html.xpath("//main")[0]
# movies are divided/enlisted within article tag
articles = div.find('article')
for article in articles:
h3 = article.find('h3')[0]
ul = article.xpath('//ul')[0].xpath('//li')
for li in ul:
try:
movie = li.find('a')[0]
poster = ul[0].find('img')[0].attrs.get('src')
output.append({
'id': movie.attrs['href'].split('/')[2],
'name': movie.text,
'url': self.baseURL + movie.attrs['href'],
'release_data': h3.text,
'poster': poster.split(',')[0]
})
except IndexError:
pass
results = {'result_count': len(output), 'results': output}
if results['result_count'] > 0:
return json.dumps(results, indent=2)
else:
return self.NA
# ............................................For Popular Movies.......................................
def get_popular(self, url):
"""
@description:- Helps to search popular movies/TV-Series by url, (used by popular_movies() & popular_tv() ).
@parameter-1:- <str:url>, url to search.
@returns:- Files/Movies/TV-Series info as JSON string.
"""
assert isinstance(url, str)
try:
response = self.session.get(url)
except requests.exceptions.ConnectionError as e:
response = self.session.get(url, verify=False)
all_li = response.html.xpath('//ul[@role="presentation"]/li')
output = []
# for link, year in zip(links, years):
for li in all_li:
for obj in li.find('a'):
if ("title" in obj.attrs.get('href')) and (". " in obj.text):
href = obj.attrs.get('href')
name = obj.text.split(". ")[-1]
break
# getting year
for span in li.find('span'):
if len(span.text.strip()) == 4:
try:
year = int(span.text.strip())
break
except:
year = "N/A"
# getting poster
try:
file_id = href.split('/')[2]
poster = li.xpath("//img[@loading='lazy']")
poster = poster[0].attrs.get('src')
poster = poster if bool(poster) else 'image_not_found'
except:
poster = 'image_not_found'
# creating file object
output.append({
'id': file_id,
'name': name,
'year': year,
'url': self.baseURL + href,
'poster': poster
})
self.search_results = {'result_count': len(output), 'results': output}
return json.dumps(self.search_results, indent=2)
def popular_movies(self, genre=None, start_id=1, sort_by=None):
"""
@description:- Helps to get 50 popular movies starting from <start_id>.
@parameter-1:- <str:genre> OPTIONAL, to filter results by genre.
@parameter-2:- <int:start_id> DEFAULT=1, start id to show results (shows results from start_id to start_id+50).
@parameter-3:- <bool:sort_by> OPTIONAL, to sort results (eg. sort=user_rating,desc OR sort=user_rating,asc).
- (visit 'https://www.imdb.com/search/title/?title_type=movie' for more info)
@returns:- Popular Movies (by genre) info as JSON string.
"""
assert isinstance(start_id, int)
if genre is not None:
assert isinstance(genre, str)
url = f"https://www.imdb.com/search/title/?title_type=movie&genres={genre}&start={start_id}&sort={sort_by}"
else:
url = f"https://www.imdb.com/search/title/?title_type=movie&start={start_id}&sort={sort_by}"
return self.get_popular(url)
def popular_tv(self, genre=None, start_id=1, sort_by=None):
"""
@description:- Helps to get 50 popular TV-Series starting from <start_id>.
@parameter-1:- <str:genre> OPTIONAL, to filter results by genre.
@parameter-2:- <int:start_id> DEFAULT=1, start id to show results (shows results from start_id to start_id+50).
@parameter-3:- <bool:sort_by> OPTIONAL, to sort results (eg. sort=user_rating,desc OR sort=user_rating,asc).
- (visit 'https://www.imdb.com/search/title/?title_type=movie' for more info)
@returns:- Popular TV-Series info as JSON string.
"""
assert isinstance(start_id, int)
if genre is not None:
assert isinstance(genre, str)
url = f"https://www.imdb.com/search/title/?title_type=tv_series,tv_miniseries&genres={genre}&start={start_id}&sort={sort_by}"
else:
url = f"https://www.imdb.com/search/title/?title_type=tv_series,tv_miniseries&start={start_id}&sort={sort_by}"
return self.get_popular(url)