|
import requests |
|
import streamlit as st |
|
import wikipedia |
|
from wikipedia import WikipediaPage |
|
import pandas as pd |
|
import spacy |
|
import unicodedata |
|
from nltk.corpus import stopwords |
|
import numpy as np |
|
import nltk |
|
from newspaper import Article |
|
|
|
nltk.download('stopwords') |
|
from string import punctuation |
|
import json |
|
import time |
|
from datetime import datetime, timedelta |
|
import urllib |
|
from io import BytesIO |
|
from PIL import Image, UnidentifiedImageError |
|
from SPARQLWrapper import SPARQLWrapper, JSON, N3 |
|
from fuzzywuzzy import process, fuzz |
|
from st_aggrid import GridOptionsBuilder, AgGrid, GridUpdateMode, DataReturnMode |
|
from transformers import pipeline |
|
import en_core_web_lg |
|
|
|
sparql = SPARQLWrapper('https://dbpedia.org/sparql') |
|
|
|
|
|
class ExtractArticleEntities: |
|
""" Extract article entities from a document using natural language processing (NLP) and fuzzy matching. |
|
Parameters |
|
- text: a string or the text of a news article to be parsed |
|
Usage: |
|
import ExtractArticleEntities |
|
instantiate with text parameter ie. entities = ExtractArticleEntities(text) |
|
retrieve Who, What, When, Where entities with entities.www_json |
|
Non-organised entities with entiities.json |
|
""" |
|
|
|
def __init__(self, text): |
|
self.text = text |
|
self.text = self.preprocessing(self.text) |
|
print(self.text) |
|
print('_____text_____') |
|
self.json = {} |
|
|
|
self.entity_df = pd.DataFrame(columns=["entity", "description"]) |
|
|
|
|
|
|
|
self.nlp = pipeline(model="51la5/roberta-large-NER") |
|
|
|
|
|
self.entity_df = self.get_who_what_where_when() |
|
|
|
|
|
self.entity_df = self.fuzzy_disambiguation() |
|
self.get_related_entity() |
|
self.get_popularity() |
|
|
|
self.entity_df = self.entity_df.drop_duplicates(subset=["description"]) |
|
|
|
self.entity_df = self.entity_df.reset_index(drop=True) |
|
|
|
|
|
self.json = self.entity_json() |
|
|
|
self.www_json = self.get_wwww_json() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_popularity(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
master_df = pd.DataFrame() |
|
view_list = [] |
|
for entity in self.entity_df['Matched Entity']: |
|
if entity: |
|
entity_to_look = entity[0] |
|
|
|
entity_to_look = entity_to_look.replace(' ', '_') |
|
print(entity_to_look, '_______') |
|
headers = { |
|
'accept': 'application/json', |
|
'User-Agent': 'Foo bar' |
|
} |
|
|
|
now = datetime.now() |
|
now_dt = now.strftime(r'%Y%m%d') |
|
week_back = now - timedelta(days=7) |
|
week_back_dt = week_back.strftime(r'%Y%m%d') |
|
resp = requests.get( |
|
f'https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia.org/all-access/all-agents/{entity_to_look}/daily/{week_back_dt}/{now_dt}', |
|
headers=headers) |
|
data = resp.json() |
|
|
|
df = pd.json_normalize(data['items']) |
|
view_count = sum(df['views']) |
|
|
|
else: |
|
view_count = 0 |
|
view_list.append(view_count) |
|
|
|
self.entity_df['Views'] = view_list |
|
|
|
for entity in ('PERSON', 'ORG', 'GPE', 'NORP', 'LOC'): |
|
related_entity_view_list = [] |
|
grouped_df = self.entity_df[self.entity_df['entity'] == entity] |
|
grouped_df['Matched count'] = grouped_df['fuzzy_match'].apply(len) |
|
grouped_df['Wiki count'] = grouped_df['Matched Entity'].apply(len) |
|
|
|
grouped_df = grouped_df.sort_values(by=['Views', 'Matched count', 'Wiki count'], |
|
ascending=False).reset_index(drop=True) |
|
if not grouped_df.empty: |
|
|
|
master_df = pd.concat([master_df, grouped_df]) |
|
|
|
self.sorted_entity_df = master_df |
|
if 'Views' in self.sorted_entity_df: |
|
self.sorted_entity_df = self.sorted_entity_df.sort_values(by=['Views'], ascending=False).reset_index( |
|
drop=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_related_entity(self): |
|
names = self.entity_df.description |
|
entities = self.entity_df.entity |
|
self.related_entity = [] |
|
match_scores = [] |
|
for name, entity in zip(names, entities): |
|
if entity in ('PERSON', 'ORG', 'GPE', 'NORP', 'LOC'): |
|
related_names = wikipedia.search(name, 10) |
|
self.related_entity.append(related_names) |
|
matches = process.extract(name, related_names) |
|
match_scores.append([match[0] for match in matches if match[1] >= 90]) |
|
else: |
|
self.related_entity.append([None]) |
|
match_scores.append([]) |
|
|
|
|
|
self.entity_df['Wikipedia Entity'] = self.related_entity |
|
self.entity_df['Matched Entity'] = match_scores |
|
|
|
def fuzzy_disambiguation(self): |
|
|
|
self.entity_df['fuzzy_match'] = '' |
|
|
|
person_choices = self.entity_df.loc[self.entity_df['entity'] == 'PERSON'] |
|
org_choices = self.entity_df.loc[self.entity_df['entity'] == 'ORG'] |
|
where_choices = self.entity_df.loc[self.entity_df['entity'] == 'GPE'] |
|
norp_choices = self.entity_df.loc[self.entity_df['entity'] == 'NORP'] |
|
loc_choices = self.entity_df.loc[self.entity_df['entity'] == 'LOC'] |
|
date_choices = self.entity_df.loc[self.entity_df['entity'] == 'DATE'] |
|
|
|
def fuzzy_match(row, choices): |
|
'''This function disambiguates entities by looking for maximum three matches with a score of 80 or more |
|
for each of the entity types. If there is no match, then the function returns None. ''' |
|
match = process.extract(row["description"], choices["description"], limit=3) |
|
|
|
match = [m[0] for m in match if m[1] > 80 and m[1] != 100] |
|
|
|
if len(match) == 0: |
|
match = [] |
|
|
|
if match: |
|
self.fuzzy_match_dict[row["description"]] = match |
|
|
|
return match |
|
|
|
|
|
|
|
self.fuzzy_match_dict = {} |
|
|
|
for i, row in self.entity_df.iterrows(): |
|
|
|
if row['entity'] == 'PERSON': |
|
|
|
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, person_choices) |
|
|
|
elif row['entity'] == 'ORG': |
|
|
|
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, org_choices) |
|
elif row['entity'] == 'GPE': |
|
|
|
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, where_choices) |
|
|
|
elif row['entity'] == 'NORP': |
|
|
|
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, norp_choices) |
|
elif row['entity'] == 'LOC': |
|
|
|
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, loc_choices) |
|
elif row['entity'] == 'DATE': |
|
|
|
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, date_choices) |
|
|
|
return self.entity_df |
|
|
|
def preprocessing(self, text): |
|
"""This function takes a text string and strips out all punctuation. It then normalizes the string to a |
|
normalized form (using the "NFKD" normalization algorithm). Finally, it strips any special characters and |
|
converts them to their unicode equivalents. """ |
|
|
|
|
|
text = text.translate(str.maketrans("", "", punctuation)) |
|
|
|
stop_words = stopwords.words('english') |
|
|
|
|
|
filtered_words = [word for word in self.text.split()] |
|
|
|
|
|
pre_text = " ".join(filtered_words) |
|
pre_text = pre_text = pre_text.replace(' ', ' ') |
|
pre_text = pre_text.replace('’', "'") |
|
pre_text = pre_text.replace('“', '"') |
|
pre_text = pre_text.replace('â€', '"') |
|
pre_text = pre_text.replace('‘', "'") |
|
pre_text = pre_text.replace('…', '...') |
|
pre_text = pre_text.replace('–', '-') |
|
pre_text = pre_text.replace("\x9d", '-') |
|
|
|
pre_text = unicodedata.normalize("NFKD", pre_text) |
|
|
|
pre_text = pre_text.translate(str.maketrans("", "", punctuation)) |
|
|
|
return pre_text |
|
|
|
def get_who_what_where_when(self): |
|
"""Get entity information in a document. |
|
This function will return a DataFrame with the following columns: |
|
- entity: the entity being queried |
|
- description: a brief description of the entity |
|
Usage: |
|
get_who_what_where_when(text) |
|
Example: |
|
> get_who_what_where_when('This is a test') |
|
PERSON |
|
ORG |
|
GPE |
|
LOC |
|
PRODUCT |
|
EVENT |
|
LAW |
|
LANGUAGE |
|
NORP |
|
DATE |
|
GPE |
|
TIME""" |
|
|
|
|
|
article_entity_list = [] |
|
|
|
doc = self.nlp(self.text) |
|
|
|
desired_entities = ['PERSON', 'ORG', 'GPE', 'LOC', 'PRODUCT', 'EVENT', 'LAW', 'LANGUAGE', 'NORP', 'DATE', 'GPE', |
|
'TIME'] |
|
self.label_dict = {} |
|
|
|
|
|
for ent in doc.ents: |
|
|
|
self.label_dict[ent] = ent.label_ |
|
if ent.label_ in desired_entities: |
|
|
|
entity_dict = {ent.label_: ent.text} |
|
|
|
article_entity_list.append(entity_dict) |
|
|
|
|
|
deduplicated_entities = {frozenset(item.values()): |
|
item for item in article_entity_list}.values() |
|
|
|
for record in deduplicated_entities: |
|
record_df = pd.DataFrame(record.items(), columns=["entity", "description"]) |
|
self.entity_df = pd.concat([self.entity_df, record_df], ignore_index=True) |
|
|
|
print(self.entity_df) |
|
print('______________________') |
|
return self.entity_df |
|
|
|
def entity_json(self): |
|
"""Returns a JSON representation of an entity defined by the `entity_df` dataframe. The `entity_json` function |
|
will return a JSON object with the following fields: |
|
- entity: The type of the entity in the text |
|
- description: The name of the entity as described in the input text |
|
- fuzzy_match: A list of fuzzy matches for the entity. This is useful for disambiguating entities that are similar |
|
""" |
|
|
|
self.json = json.loads(self.entity_df.to_json(orient='records')) |
|
|
|
return self.json |
|
|
|
def get_wwww_json(self): |
|
"""This function returns a JSON representation of the `get_who_what_where_when` function. The `get_www_json` |
|
function will return a JSON object with the following fields: |
|
- entity: The type of the entity in the text |
|
- description: The name of the entity as described in the input text |
|
- fuzzy_match: A list of fuzzy matches for the entity. This is useful for disambiguating entities that are similar |
|
""" |
|
|
|
|
|
who_dict = {"who": [ent for ent in self.entity_json() if ent['entity'] in ['ORG', 'PERSON']]} |
|
where_dict = {"where": [ent for ent in self.entity_json() if ent['entity'] in ['GPE', 'LOC']]} |
|
when_dict = {"when": [ent for ent in self.entity_json() if ent['entity'] in ['DATE', 'TIME']]} |
|
what_dict = { |
|
"what": [ent for ent in self.entity_json() if ent['entity'] in ['PRODUCT', 'EVENT', 'LAW', 'LANGUAGE', |
|
'NORP']]} |
|
article_wwww = [who_dict, where_dict, when_dict, what_dict] |
|
self.wwww_json = json.dumps(article_wwww, indent=2) |
|
|
|
return self.wwww_json |
|
|
|
|
|
news_article = st.text_input('Paste an Article here to be parsed') |
|
if 'parsed' not in st.session_state: |
|
st.session_state['parsed'] = None |
|
st.session_state['article'] = None |
|
if news_article: |
|
st.write('Your news article is') |
|
st.write(news_article) |
|
|
|
if st.button('Get details'): |
|
|
|
parsed = ExtractArticleEntities(news_article) |
|
if parsed: |
|
st.session_state['article'] = parsed.sorted_entity_df |
|
st.session_state['parsed'] = True |
|
st.session_state['json'] = parsed.www_json |
|
|
|
|
|
|
|
|
|
def preprocessing(text): |
|
"""This function takes a text string and strips out all punctuation. It then normalizes the string to a |
|
normalized form (using the "NFKD" normalization algorithm). Finally, it strips any special characters and |
|
converts them to their unicode equivalents. """ |
|
|
|
|
|
if text: |
|
text = text.translate(str.maketrans("", "", punctuation)) |
|
|
|
stop_words = stopwords.words('english') |
|
|
|
|
|
filtered_words = [word for word in text.split()] |
|
|
|
|
|
pre_text = " ".join(filtered_words) |
|
pre_text = pre_text = pre_text.replace(' ', ' ') |
|
pre_text = pre_text.replace('’', "'") |
|
pre_text = pre_text.replace('“', '"') |
|
pre_text = pre_text.replace('â€', '"') |
|
pre_text = pre_text.replace('‘', "'") |
|
pre_text = pre_text.replace('…', '...') |
|
pre_text = pre_text.replace('–', '-') |
|
pre_text = pre_text.replace("\x9d", '-') |
|
|
|
pre_text = unicodedata.normalize("NFKD", pre_text) |
|
|
|
pre_text = pre_text.translate(str.maketrans("", "", punctuation)) |
|
|
|
else: |
|
pre_text = None |
|
return pre_text |
|
|
|
|
|
def filter_wiki_df(df): |
|
key_list = df.keys()[:2] |
|
|
|
df = df[key_list] |
|
|
|
df['Match Check'] = np.where(df[df.keys()[0]] != df[df.keys()[1]], True, False) |
|
|
|
df = df[df['Match Check'] != False] |
|
df = df[key_list] |
|
df = df.dropna(how='any').reset_index(drop=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df.rename(columns={key_list[0]: 'Attribute', key_list[1]: 'Value'}, inplace=True) |
|
|
|
return df |
|
|
|
|
|
def get_entity_from_selectbox(related_entity): |
|
entity = st.selectbox('Please select the term:', related_entity, key='foo') |
|
if entity: |
|
summary_entity = wikipedia.summary(entity, 3) |
|
return summary_entity |
|
|
|
|
|
if st.session_state['parsed']: |
|
df = st.session_state['article'] |
|
|
|
|
|
df_to_st = pd.DataFrame() |
|
|
|
df_to_st['Name'] = df['description'] |
|
df_to_st['Is a type of'] = df['entity'] |
|
df_to_st['Related to'] = df['Matched Entity'] |
|
df_to_st['Is a type of'] = df_to_st['Is a type of'].replace({'PERSON': 'Person', |
|
'ORG': 'Organization', |
|
'GPE': 'Political Location', |
|
'NORP': 'Political or Religious Groups', |
|
'LOC': 'Non Political Location'}) |
|
gb = GridOptionsBuilder.from_dataframe(df_to_st) |
|
gb.configure_pagination(paginationAutoPageSize=True) |
|
gb.configure_side_bar() |
|
gb.configure_selection('multiple', use_checkbox=True, |
|
groupSelectsChildren="Group checkbox select children") |
|
gridOptions = gb.build() |
|
|
|
|
|
grid_response = AgGrid( |
|
df_to_st, |
|
gridOptions=gridOptions, |
|
data_return_mode='AS_INPUT', |
|
update_mode='MODEL_CHANGED', |
|
fit_columns_on_grid_load=False, |
|
enable_enterprise_modules=True, |
|
height=350, |
|
width='100%', |
|
reload_data=True |
|
) |
|
|
|
data = grid_response['data'] |
|
selected = grid_response['selected_rows'] |
|
selected_df = pd.DataFrame(selected) |
|
if not selected_df.empty: |
|
selected_entity = selected_df[['Name', 'Is a type of', 'Related to']] |
|
st.dataframe(selected_entity) |
|
|
|
|
|
|
|
|
|
entities_list = df['description'] |
|
|
|
|
|
if not selected_df.empty and selected_entity['Name'].any(): |
|
|
|
|
|
|
|
|
|
selected_row = df.loc[df['description'] == selected_entity['Name'][0]] |
|
|
|
entity_value = selected_row.values |
|
|
|
label, name, fuzzy, related, related_match, _, _, _ = entity_value[0] |
|
not_matched = [word for word in related if word not in related_match] |
|
fuzzy = fuzzy[0] if len(fuzzy) > 0 else '' |
|
related = related[0] if len(related) > 0 else '' |
|
not_matched = not_matched[0] if len(not_matched) > 0 else related |
|
|
|
related_entity_list = [name, fuzzy, not_matched] |
|
related_entity = entity_value[0][1:] |
|
|
|
google_query_term = ' '.join(related_entity_list) |
|
|
|
try: |
|
urls = [i for i in search(google_query_term, stop=10, pause=2.0, tld='com', lang='en', tbs='0', |
|
user_agent=get_random_user_agent())] |
|
except: |
|
urls = [] |
|
|
|
st.session_state['wiki_summary'] = False |
|
all_related_entity = [] |
|
for el in related_entity[:-2]: |
|
if isinstance(el, str): |
|
all_related_entity.append(el) |
|
elif isinstance(el, int): |
|
all_related_entity.append(str(el)) |
|
else: |
|
all_related_entity.extend(el) |
|
|
|
for entity in all_related_entity: |
|
|
|
if True: |
|
if entity: |
|
entity = entity.replace(' ', '_') |
|
query = f''' |
|
SELECT ?name ?comment ?image |
|
WHERE {{ dbr:{entity} rdfs:label ?name. |
|
dbr:{entity} rdfs:comment ?comment. |
|
dbr:{entity} dbo:thumbnail ?image. |
|
|
|
FILTER (lang(?name) = 'en') |
|
FILTER (lang(?comment) = 'en') |
|
}}''' |
|
sparql.setQuery(query) |
|
|
|
sparql.setReturnFormat(JSON) |
|
qres = sparql.query().convert() |
|
if qres['results']['bindings']: |
|
result = qres['results']['bindings'][0] |
|
name, comment, image_url = result['name']['value'], result['comment']['value'], result['image'][ |
|
'value'] |
|
|
|
|
|
|
|
wiki_url = f'https://en.wikipedia.org/wiki/{entity}' |
|
|
|
st.write(name) |
|
|
|
st.write(image_url) |
|
|
|
response = requests.get(image_url) |
|
try: |
|
related_image = Image.open(BytesIO(response.content)) |
|
st.image(related_image) |
|
except UnidentifiedImageError: |
|
st.write('Not able to get image') |
|
pass |
|
|
|
|
|
|
|
summary_entity = comment |
|
wiki_knowledge_df = pd.read_html(wiki_url)[0] |
|
wiki_knowledge_df = filter_wiki_df(wiki_knowledge_df) |
|
|
|
st.write('Showing desciption for entity:', name) |
|
st.dataframe(wiki_knowledge_df) |
|
|
|
|
|
break |
|
|
|
else: |
|
summary_entity = None |
|
if not summary_entity: |
|
try: |
|
summary_entity = get_entity_from_selectbox(all_related_entity) |
|
|
|
|
|
except wikipedia.exceptions.DisambiguationError: |
|
st.write('Disambiguation is there for term') |
|
|
|
if selected_entity['Name'].any(): |
|
st.write(f'Summary for {selected_entity["Name"][0]}') |
|
st.write(summary_entity) |
|
|