|
import requests |
|
import streamlit as st |
|
from bs4 import BeautifulSoup |
|
import pandas as pd |
|
from transformers import pipeline |
|
import plotly.express as px |
|
import time |
|
import io |
|
import os |
|
import zipfile |
|
import re |
|
import json |
|
from cryptography.fernet import Fernet |
|
from streamlit_extras.stylable_container import stylable_container |
|
from comet_ml import Experiment |
|
|
|
st.set_page_config( |
|
layout="wide", |
|
page_title="English Keyphrase TXT & URL Entity Finder" |
|
) |
|
|
|
|
|
COMET_API_KEY = os.environ.get("COMET_API_KEY") |
|
COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE") |
|
COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME") |
|
comet_initialized = False |
|
if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME: |
|
comet_initialized = True |
|
|
|
|
|
PERSISTENCE_FILE = "app_data.json" |
|
max_attempts = 10 |
|
|
|
def load_persistent_data(): |
|
""" |
|
Loads the attempts count and file upload history from a persistent JSON file. |
|
Returns default values if the file doesn't exist or is invalid. |
|
""" |
|
if os.path.exists(PERSISTENCE_FILE): |
|
try: |
|
with open(PERSISTENCE_FILE, "r") as f: |
|
data = json.load(f) |
|
return data.get('source_type_attempts', 0), data.get('file_upload_history', []) |
|
except (json.JSONDecodeError, KeyError): |
|
st.warning("Warning: Could not read persistent data file. Starting with a fresh state.") |
|
return 0, [] |
|
return 0, [] |
|
|
|
def save_persistent_data(attempts, history): |
|
""" |
|
Saves the current attempts count and file upload history to the persistent JSON file. |
|
""" |
|
with open(PERSISTENCE_FILE, "w") as f: |
|
json.dump({'source_type_attempts': attempts, 'file_upload_history': history}, f, indent=4) |
|
|
|
def clear_input_history_and_rerun(): |
|
"""Callback function for the "Clear Input History" button.""" |
|
st.session_state['file_upload_history'] = [] |
|
save_persistent_data(st.session_state['source_type_attempts'], []) |
|
st.experimental_rerun() |
|
|
|
|
|
if 'source_type_attempts' not in st.session_state: |
|
attempts, history = load_persistent_data() |
|
st.session_state['source_type_attempts'] = attempts |
|
st.session_state['file_upload_history'] = history |
|
|
|
if 'encrypted_text_to_process' not in st.session_state: |
|
st.session_state['encrypted_text_to_process'] = None |
|
if 'uploaded_file_content' not in st.session_state: |
|
st.session_state['uploaded_file_content'] = None |
|
if 'file_uploader_key' not in st.session_state: |
|
st.session_state['file_uploader_key'] = 0 |
|
|
|
|
|
@st.cache_resource |
|
def load_encryption_key(): |
|
try: |
|
key_str = os.environ.get("FERNET_KEY") |
|
if not key_str: |
|
raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.") |
|
key_bytes = key_str.encode('utf-8') |
|
return Fernet(key_bytes) |
|
except ValueError as ve: |
|
st.error(f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely.") |
|
st.stop() |
|
except Exception as e: |
|
st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.") |
|
st.stop() |
|
|
|
|
|
fernet = load_encryption_key() |
|
|
|
def encrypt_text(text_content: str) -> bytes: |
|
"""Encrypts a string using the loaded Fernet cipher.""" |
|
return fernet.encrypt(text_content.encode('utf-8')) |
|
|
|
def decrypt_text(encrypted_bytes: bytes) -> str | None: |
|
""" |
|
Decrypts bytes using the loaded Fernet cipher. |
|
Returns the decrypted string, or None if decryption fails. |
|
""" |
|
try: |
|
return fernet.decrypt(encrypted_bytes).decode('utf-8') |
|
except Exception as e: |
|
st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}") |
|
return None |
|
|
|
|
|
st.subheader("English Keyphrase TXT & URL Entity Finder", divider="rainbow") |
|
st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary") |
|
|
|
expander = st.expander("**Important notes on the English Keyphrase TXT & URL Entity Finder**") |
|
expander.write(''' |
|
**Named Entities:** This English Keyphrase TXT & URL Entity Finder extracts keyphrases from English academic and scientific papers. |
|
|
|
Results are presented in an easy-to-read table, visualized in an interactive bar chart and tree map, and are available for download along with a Glossary of tags. |
|
|
|
**How to Use:** |
|
1. Paste a URL and press Enter. |
|
2. Alternatively, type or paste text directly into the text area and press Ctrl + Enter. |
|
3. Or, upload your TXT file. |
|
|
|
**Usage Limits:** You can request results up to 10 times. |
|
|
|
**Customization:** To change the app's background color to white or black, click the three-dot menu on the right-hand side of your app, go to Settings and then Choose app theme, colors and fonts. |
|
|
|
**Technical issues:** If your connection times out, please refresh the page or reopen the app's URL. |
|
|
|
For any errors or inquiries, please contact us at [email protected] |
|
''') |
|
|
|
|
|
with st.sidebar: |
|
container = st.container(border=True) |
|
container.write("**Named Entity Recognition (NER)** is the task of extracting and tagging entities in text data. Entities can be persons, organizations, locations, countries, products, events etc.") |
|
|
|
st.subheader("Persistent Data", divider="rainbow") |
|
st.info(f"Requests remaining today: **{max_attempts - st.session_state['source_type_attempts']}**") |
|
|
|
if st.session_state['file_upload_history']: |
|
st.subheader("File & URL History", divider="rainbow") |
|
history_df = pd.DataFrame(st.session_state['file_upload_history']) |
|
st.dataframe(history_df, use_container_width=True, hide_index=True) |
|
st.button("Clear Input History", on_click=clear_input_history_and_rerun, type="secondary") |
|
|
|
st.subheader("Related NER Web Apps", divider="rainbow") |
|
st.link_button("Scandinavian JSON Entity Finder", "https://nlpblogs.com/shop/named-entity-recognition-ner/scandinavian-json-entity-finder/", type="primary") |
|
|
|
|
|
def clear_inputs(): |
|
st.session_state.url = "" |
|
st.session_state.my_text_area = "" |
|
st.session_state['uploaded_file_content'] = None |
|
st.session_state['encrypted_text_to_process'] = None |
|
st.session_state['file_uploader_key'] += 1 |
|
st.experimental_rerun() |
|
|
|
url = st.text_input("Enter URL from the internet, and then press Enter:", key="url") |
|
text = st.text_area("Type or paste your text below, and then press Ctrl + Enter", key='my_text_area') |
|
uploaded_file = st.file_uploader("Or upload a .txt file", type=["txt"], key=f"file_uploader_{st.session_state['file_uploader_key']}") |
|
st.button("Clear All Inputs", on_click=clear_inputs) |
|
|
|
source_type = None |
|
current_run_text = None |
|
|
|
if uploaded_file is not None and st.session_state.get('uploaded_file_content') is None: |
|
source_type = 'file' |
|
try: |
|
string_data = io.StringIO(uploaded_file.getvalue().decode("utf-8")).read() |
|
current_run_text = string_data |
|
st.session_state['uploaded_file_content'] = current_run_text |
|
st.session_state['file_upload_history'].append({ |
|
'source_type': 'file', |
|
'filename': uploaded_file.name, |
|
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') |
|
}) |
|
save_persistent_data(st.session_state['source_type_attempts'], st.session_state['file_upload_history']) |
|
st.success("TXT file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.") |
|
st.divider() |
|
st.write("**Input text content (from uploaded file)**") |
|
st.write(current_run_text[:500] + "..." if len(current_run_text) > 500 else current_run_text) |
|
except Exception as e: |
|
st.error(f"Error processing uploaded file: {e}") |
|
current_run_text = None |
|
elif url: |
|
source_type = 'url' |
|
if not url.startswith(("http://", "https://")): |
|
st.error("Please enter a valid URL starting with 'http://' or 'https://'.") |
|
current_run_text = None |
|
else: |
|
try: |
|
with st.spinner(f"Fetching and parsing content from **{url}**...", show_time=True): |
|
f = requests.get(url, timeout=10) |
|
f.raise_for_status() |
|
soup = BeautifulSoup(f.text, 'html.parser') |
|
current_run_text = soup.get_text(separator=' ', strip=True) |
|
st.session_state['file_upload_history'].append({ |
|
'source_type': 'url', |
|
'filename': url, |
|
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') |
|
}) |
|
save_persistent_data(st.session_state['source_type_attempts'], st.session_state['file_upload_history']) |
|
st.divider() |
|
st.write("**Input text content (from URL)**") |
|
st.write(current_run_text[:500] + "..." if len(current_run_text) > 500 else current_run_text) |
|
except Exception as e: |
|
st.error(f"Error fetching or parsing URL: {e}") |
|
current_run_text = None |
|
elif text: |
|
source_type = 'text' |
|
current_run_text = text |
|
st.divider() |
|
st.write("**Input text content (from text area)**") |
|
st.write(current_run_text[:500] + "..." if len(current_run_text) > 500 else current_run_text) |
|
|
|
if current_run_text and current_run_text.strip(): |
|
if st.session_state.get('encrypted_text_to_process') is None: |
|
st.session_state['encrypted_text_to_process'] = encrypt_text(current_run_text) |
|
else: |
|
st.session_state['encrypted_text_to_process'] = None |
|
if uploaded_file is None: |
|
st.session_state['uploaded_file_content'] = None |
|
st.session_state['file_uploader_key'] += 1 |
|
|
|
|
|
|
|
|
|
if st.button("Analyze Text", type="primary"): |
|
if st.session_state['encrypted_text_to_process']: |
|
try: |
|
start_time_overall = time.time() |
|
|
|
if st.session_state['source_type_attempts'] >= max_attempts: |
|
st.error(f"You have requested results {max_attempts} times. You have reached your request limit.") |
|
st.stop() |
|
|
|
st.session_state['source_type_attempts'] += 1 |
|
save_persistent_data(st.session_state['source_type_attempts'], st.session_state['file_upload_history']) |
|
|
|
@st.cache_resource |
|
def load_ner_model(): |
|
return pipeline("token-classification", |
|
model="ml6team/keyphrase-extraction-kbir-inspec", |
|
aggregation_strategy="max", |
|
stride=128, |
|
ignore_labels=["O"]) |
|
|
|
model = load_ner_model() |
|
text_for_ner = decrypt_text(st.session_state['encrypted_text_to_process']) |
|
|
|
if text_for_ner and len(text_for_ner.strip()) > 0: |
|
with st.spinner("Analyzing text...", show_time=True): |
|
entities = model(text_for_ner) |
|
data = [] |
|
if entities: |
|
for entity in entities: |
|
if all(k in entity for k in ['word', 'entity_group', 'score', 'start', 'end']): |
|
data.append({ |
|
'word': entity['word'], |
|
'entity_group': entity['entity_group'], |
|
'score': entity['score'], |
|
'start': entity['start'], |
|
'end': entity['end'] |
|
}) |
|
else: |
|
st.warning(f"Skipping malformed entity encountered: {entity}. Missing expected keys.") |
|
df = pd.DataFrame(data) |
|
else: |
|
df = pd.DataFrame(columns=['word', 'entity_group', 'score', 'start', 'end']) |
|
|
|
if not df.empty: |
|
pattern = r'[^\w\s]' |
|
df['word'] = df['word'].replace(pattern, '', regex=True) |
|
df = df.replace('', 'Unknown') |
|
|
|
st.subheader("All Extracted Keyphrases", divider="rainbow") |
|
st.dataframe(df, use_container_width=True) |
|
|
|
with st.expander("See Glossary of tags"): |
|
st.write(''' |
|
**word**: ['entity extracted from your text data'] |
|
|
|
**score**: ['accuracy score; how accurately a tag has been assigned to a given entity'] |
|
|
|
**entity_group**: ['label (tag) assigned to a given extracted entity'] |
|
|
|
**start**: ['index of the start of the corresponding entity'] |
|
|
|
**end**: ['index of the end of the corresponding entity'] |
|
|
|
''') |
|
st.divider() |
|
|
|
st.subheader("Most Frequent Keyphrases", divider="rainbow") |
|
word_counts = df['word'].value_counts().reset_index() |
|
word_counts.columns = ['word', 'count'] |
|
df_frequent = word_counts.sort_values(by='count', ascending=False).head(15) |
|
|
|
if not df_frequent.empty: |
|
tab1, tab2 = st.tabs(["Table", "Chart"]) |
|
|
|
with tab1: |
|
st.dataframe(df_frequent, use_container_width=True) |
|
|
|
with tab2: |
|
fig_frequent_bar = px.bar( |
|
df_frequent, |
|
x='count', |
|
y='word', |
|
orientation='h', |
|
title='Top Frequent Keyphrases by Count', |
|
color='count', |
|
color_continuous_scale=px.colors.sequential.Viridis |
|
) |
|
fig_frequent_bar.update_layout(yaxis={'categoryorder':'total ascending'}) |
|
st.plotly_chart(fig_frequent_bar, use_container_width=True) |
|
|
|
if comet_initialized and 'experiment' in locals(): |
|
experiment.log_figure(figure=fig_frequent_bar, figure_name="frequent_keyphrases_bar_chart") |
|
else: |
|
st.info("No keyphrases found with more than one occurrence to display in tabs.") |
|
|
|
st.divider() |
|
|
|
experiment = None |
|
if comet_initialized: |
|
experiment = Experiment( |
|
api_key=COMET_API_KEY, |
|
workspace=COMET_WORKSPACE, |
|
project_name=COMET_PROJECT_NAME, |
|
) |
|
experiment.log_parameter("input_source_type", source_type) |
|
experiment.log_parameter("input_content_length", len(text_for_ner)) |
|
experiment.log_table("predicted_entities", df) |
|
|
|
st.subheader("Treemap of All Keyphrases", divider="rainbow") |
|
fig_treemap = px.treemap( |
|
df, |
|
path=[px.Constant("all"), 'entity_group', 'word'], |
|
values='score', |
|
color='word', |
|
color_continuous_scale=px.colors.sequential.Plasma |
|
) |
|
fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25)) |
|
st.plotly_chart(fig_treemap, use_container_width=True) |
|
|
|
if comet_initialized and experiment: |
|
experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap") |
|
|
|
|
|
dfa = pd.DataFrame( |
|
data={ |
|
'Column Name': ['word', 'entity_group', 'score', 'start', 'end'], |
|
'Description': [ |
|
'entity extracted from your text data', |
|
'label (tag) assigned to a given extracted entity', |
|
'accuracy score; how accurately a tag has been assigned to a given entity', |
|
'index of the start of the corresponding entity', |
|
'index of the end of the corresponding entity' |
|
] |
|
} |
|
) |
|
buf = io.BytesIO() |
|
with zipfile.ZipFile(buf, "w") as myzip: |
|
if not df.empty: |
|
myzip.writestr("Summary_of_results.csv", df.to_csv(index=False)) |
|
myzip.writestr("Most_frequent_keyphrases.csv", df_frequent.to_csv(index=False)) |
|
myzip.writestr("Glossary_of_tags.csv", dfa.to_csv(index=False)) |
|
|
|
with stylable_container( |
|
key="download_button", |
|
css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""", |
|
): |
|
st.download_button( |
|
label="Download zip file", |
|
data=buf.getvalue(), |
|
file_name="nlpblogs_ner_results.zip", |
|
mime="application/zip", |
|
) |
|
st.divider() |
|
else: |
|
st.warning("No entities found to generate visualizations.") |
|
else: |
|
st.warning("No meaningful text found to process. Please enter a URL, upload a text file, or type/paste text.") |
|
except Exception as e: |
|
st.error(f"An unexpected error occurred during processing: {e}") |
|
finally: |
|
if comet_initialized and experiment is not None: |
|
try: |
|
experiment.end() |
|
except Exception as comet_e: |
|
st.warning(f"Comet ML experiment.end() failed: {comet_e}") |
|
if start_time_overall is not None: |
|
end_time_overall = time.time() |
|
elapsed_time_overall = end_time_overall - start_time_overall |
|
st.info(f"Results processed in **{elapsed_time_overall:.2f} seconds**.") |
|
st.write(f"Number of times you requested results: **{st.session_state['source_type_attempts']}/{max_attempts}**") |
|
else: |
|
st.warning("Please enter some text, a URL, or upload a file to analyze.") |