import time import streamlit as st import pandas as pd import io import plotly.express as px import zipfile import os import re import numpy as np import json # Added to handle persistent data from cryptography.fernet import Fernet from gliner import GLiNER from PyPDF2 import PdfReader import docx from comet_ml import Experiment from streamlit_extras.stylable_container import stylable_container st.set_page_config(layout="wide", page_title="Named Entity Recognition App") # --- Configuration --- COMET_API_KEY = os.environ.get("COMET_API_KEY") COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE") COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME") comet_initialized = False if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME: comet_initialized = True # --- Persistent Counter and History Configuration --- COUNTER_FILE = "counter_ner_app.json" HISTORY_FILE = "file_history_ner_app.json" max_attempts = 300 # --- Functions to manage persistent data --- def load_attempts(): """ Loads the attempts count from a persistent JSON file. Returns 0 if the file doesn't exist or is invalid. """ if os.path.exists(COUNTER_FILE): try: with open(COUNTER_FILE, "r") as f: data = json.load(f) return data.get('file_upload_attempts', 0) except (json.JSONDecodeError, KeyError): return 0 return 0 def save_attempts(attempts): """ Saves the current attempts count to the persistent JSON file. """ with open(COUNTER_FILE, "w") as f: json.dump({'file_upload_attempts': attempts}, f) def load_history(): """ Loads the file upload history from a persistent JSON file. Returns an empty list if the file doesn't exist or is invalid. """ if os.path.exists(HISTORY_FILE): try: with open(HISTORY_FILE, "r") as f: data = json.load(f) return data.get('uploaded_files', []) except (json.JSONDecodeError, KeyError): return [] return [] def save_history(history): """ Saves the current file upload history to the persistent JSON file. """ with open(HISTORY_FILE, "w") as f: json.dump({'uploaded_files': history}, f) def clear_history_data(): """Clears the file history from session state and deletes the persistent file.""" if os.path.exists(HISTORY_FILE): os.remove(HISTORY_FILE) st.session_state['uploaded_files_history'] = [] st.rerun() # --- Initialize session state with persistent data --- if 'file_upload_attempts' not in st.session_state: st.session_state['file_upload_attempts'] = load_attempts() save_attempts(st.session_state['file_upload_attempts']) if 'uploaded_files_history' not in st.session_state: st.session_state['uploaded_files_history'] = load_history() save_history(st.session_state['uploaded_files_history']) if 'encrypted_extracted_text' not in st.session_state: st.session_state['encrypted_extracted_text'] = None GLINER_LABELS_CATEGORIZED = { "Personal Identifiers": [ "Person", "Date of birth", "Blood type", "Digital signature", "Social media handle", "Username", "Birth certificate number", ], "Contact Details": [ "Address", "Phone number", "Mobile phone number", "Landline phone number", "Email", "Fax number", "Postal code", ], "Financial & Payment": [ "Credit card number", "Credit card expiration date", "CVV", "CVC", "Bank account number", "IBAN", "Transaction number", "Credit card brand", ], "Government & Official IDs": [ "Passport number", "Social security number", "CPF", "Driver license number", "Tax identification number", "Identity card number", "National ID number", "Identity document number", "Visa number", "License plate number", "CNPJ", "Registration number", "Student ID number", "Passport expiration date", ], "Medical & Health": [ "Medication", "Medical condition", "Health insurance ID number", "Health insurance number", "National health insurance number", ], "Travel & Transport": [ "Flight number", "Reservation number", "Train ticket number", "Vehicle registration number", ], "General Business & Other": [ "Organization", "Insurance company", "IP address", "Serial number", "Insurance number", ] } # Flatten the categorized labels into a single list for GLiNER model input GLINER_LABELS_FLAT = [label for category_labels in GLINER_LABELS_CATEGORIZED.values() for label in category_labels] # Create a mapping from each specific label to its category for DataFrame processing LABEL_TO_CATEGORY_MAP = {label: category for category, labels in GLINER_LABELS_CATEGORIZED.items() for label in labels} @st.cache_resource def load_ner_model(): """ Loads the pre-trained GLiNER NER model (urchade/gliner_multi_pii-v1) and caches it. This model is suitable for a wide range of custom entity types. """ try: return GLiNER.from_pretrained("urchade/gliner_multi_pii-v1") except Exception as e: st.error(f"Failed to load NER model. Please check your internet connection or model availability: {e}") st.stop() @st.cache_resource def load_encryption_key(): """ Loads the Fernet encryption key from environment variables. This key is crucial for encrypting/decrypting sensitive data. It's cached as a resource to be loaded only once. """ try: # Get the key string from environment variables key_str = os.environ.get("FERNET_KEY") if not key_str: raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.") # Fernet key must be bytes, so encode the string key_bytes = key_str.encode('utf-8') return Fernet(key_bytes) except ValueError as ve: st.error(f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely in your deployment environment (e.g., Hugging Face Spaces secrets, Render environment variables) or in a local .env file for development.") st.stop() # Stop the app if the key is not found, as security is compromised except Exception as e: st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.") st.stop() # Initialize the Fernet cipher instance globally (cached) fernet = load_encryption_key() def encrypt_text(text_content: str) -> bytes: """ Encrypts a string using the loaded Fernet cipher. The input string is first encoded to UTF-8 bytes. """ return fernet.encrypt(text_content.encode('utf-8')) def decrypt_text(encrypted_bytes: bytes) -> str | None: """ Decrypts bytes using the loaded Fernet cipher. Returns the decrypted string, or None if decryption fails (e.g., tampering). """ try: return fernet.decrypt(encrypted_bytes).decode('utf-8') except Exception as e: st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}") return None # --- UI Elements --- st.subheader("Multilingual PDF & DOCX Entity Finder", divider="orange") # Updated title st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary") expander = st.expander("**Important notes on the Multilingual PDF & DOCX Entity Finder**") expander.write(f''' **Named Entities:** This Multilingual PDF & DOCX Entity Finder predicts a wide range of custom labels, including: "Person", "Organization", "Phone number", "Address", "Passport number", "Email", "Credit card number", "Social security number", "Health insurance ID number", "Date of birth", "Mobile phone number", "Bank account number", "Medication", "CPF", "Driver license number", "Tax identification number", "Medical condition", "Identity card number", "National ID number", "IP address", "IBAN", "Credit card expiration date", "Username", "Health insurance number", "Registration number", "Student ID number", "Insurance number", "Flight number", "Landline phone number", "Blood type", "CVV", "Reservation number", "Digital signature", "Social media handle", "License plate number", "CNPJ", "Postal code", "Serial number", "Vehicle registration number", "Credit card brand", "Fax number", "Visa number", "Insurance company", "Identity document number", "Transaction number", "National health insurance number", "CVC", "Birth certificate number", "Train ticket number", "Passport expiration date" Results are presented in an easy-to-read table, visualized in an interactive tree map, pie chart, and bar chart, and are available for download along with a Glossary of tags. **Supported languages:** English, French, German, Spanish, Portuguese, Italian **How to Use:** Upload your PDF or DOCX file. Then, click the 'Results' button to extract and tag entities in your text data. **Usage Limits:** You can request results up to 300 requests within a 30-day period. **Language settings:** Please check and adjust the language settings in your computer, so the French, German, Spanish, Portuguese and Italian characters are handled properly in your downloaded file. **Customization:** To change the app's background color to white or black, click the three-dot menu on the right-hand side of your app, go to Settings and then Choose app theme, colors and fonts. **Technical issues:** If your connection times out, please refresh the page or reopen the app's URL. For any errors or inquiries, please contact us at info@nlpblogs.com ''') with st.sidebar: # --- Added Persistent History Display --- st.subheader("Your File Upload History", divider="orange") if st.session_state['uploaded_files_history']: history_df = pd.DataFrame(st.session_state['uploaded_files_history']) st.dataframe(history_df, use_container_width=True, hide_index=True) # Add a clear history button if st.button("Clear File History", help="This will permanently delete the file history from the application."): clear_history_data() else: st.info("You have not uploaded any files yet.") st.subheader("Build your own NER Web App in a minute without writing a single line of code.", divider="orange") st.link_button("NER File Builder", "https://nlpblogs.com/shop/named-entity-recognition-ner/ner-file-builder/", type="primary") # --- File Upload (PDF/DOCX) --- uploaded_file = st.file_uploader("Upload your file. Accepted file formats include: .pdf, .docx", type=['pdf', 'docx']) current_run_text = None if uploaded_file is not None: file_extension = uploaded_file.name.split('.')[-1].lower() # Check if this file has already been processed and is the same as the last one # This prevents re-adding the same file to history on every rerun of the app if st.session_state['uploaded_files_history'] and uploaded_file.name == st.session_state['uploaded_files_history'][-1]['filename']: # Do not re-add to history, just process the file pass else: # --- ADDING TO UPLOAD HISTORY --- new_upload_entry = { "filename": uploaded_file.name, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } st.session_state['uploaded_files_history'].append(new_upload_entry) save_history(st.session_state['uploaded_files_history']) if file_extension == 'pdf': try: pdf_reader = PdfReader(uploaded_file) text_content = "" for page in pdf_reader.pages: text_content += page.extract_text() current_run_text = text_content st.success("PDF file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.") except Exception as e: st.error(f"An error occurred while reading PDF: {e}") current_run_text = None elif file_extension == 'docx': try: doc = docx.Document(uploaded_file) text_content = "\n".join([para.text for para in doc.paragraphs]) current_run_text = text_content st.success("DOCX file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.") except Exception as e: st.error(f"An error occurred while reading DOCX: {e}") current_run_text = None else: st.warning("Unsupported file type. Please upload a .pdf or .docx file.") current_run_text = None if current_run_text and current_run_text.strip(): # --- ENCRYPT THE EXTRACTED TEXT BEFORE STORING IN SESSION STATE --- encrypted_text_bytes = encrypt_text(current_run_text) st.session_state['encrypted_extracted_text'] = encrypted_text_bytes st.divider() else: st.session_state['encrypted_extracted_text'] = None st.error("Could not extract meaningful text from the uploaded file.") # --- Results Button and Processing Logic --- if st.button("Results"): start_time_overall = time.time() # Start time for overall processing if not comet_initialized: st.warning("Comet ML not initialized. Check environment variables if you wish to log data.") if st.session_state['file_upload_attempts'] >= max_attempts: st.error(f"You have requested results {max_attempts} times. You have reached your daily request limit.") st.stop() # --- DECRYPT THE TEXT BEFORE PASSING TO NER MODEL --- text_for_ner = None if st.session_state['encrypted_extracted_text'] is not None: text_for_ner = decrypt_text(st.session_state['encrypted_extracted_text']) if text_for_ner is None or not text_for_ner.strip(): st.warning("No extractable text content available for analysis. Please upload a valid PDF or DOCX file.") st.stop() # Increment and save the attempts counter st.session_state['file_upload_attempts'] += 1 save_attempts(st.session_state['file_upload_attempts']) with st.spinner("Analyzing text...", show_time=True): model = load_ner_model() # Measure NER model processing time start_time_ner = time.time() # Use GLiNER's predict_entities method with the defined flat list of labels text_entities = model.predict_entities(text_for_ner, GLINER_LABELS_FLAT) end_time_ner = time.time() ner_processing_time = end_time_ner - start_time_ner df = pd.DataFrame(text_entities) # Rename 'label' to 'entity_group' and 'text' to 'word' for consistency if 'label' in df.columns: df.rename(columns={'label': 'entity_group', 'text': 'word'}, inplace=True) else: st.error("Unexpected GLiNER output structure. Please check the model's output format.") st.stop() # Replace empty strings with 'Unknown' and drop rows with NaN after cleaning df = df.replace('', 'Unknown').dropna() if df.empty: st.warning("No entities were extracted from the uploaded text.") st.stop() # --- Add 'category' column to the DataFrame based on the grouped labels --- df['category'] = df['entity_group'].map(LABEL_TO_CATEGORY_MAP) # Handle cases where an entity_group might not have a category (shouldn't happen if maps are complete) df['category'] = df['category'].fillna('Uncategorized') if comet_initialized: experiment = Experiment( api_key=COMET_API_KEY, workspace=COMET_WORKSPACE, project_name=COMET_PROJECT_NAME, ) experiment.log_parameter("input_text_length", len(text_for_ner)) experiment.log_table("predicted_entities", df) experiment.log_metric("ner_processing_time_seconds", ner_processing_time) # --- Display Results --- st.subheader("Extracted Entities", divider="rainbow") properties = {"border": "2px solid gray", "color": "blue", "font-size": "16px"} df_styled = df.style.set_properties(**properties) st.dataframe(df_styled, use_container_width=True) with st.expander("See Glossary of tags"): st.write(""" '**word**': ['entity extracted from your text data'] '**score**': ['accuracy score; how accurately a tag has been assigned to a given entity'] '**entity_group**': ['label (tag) assigned to a given extracted entity'] '**start**': ['index of the start of the corresponding entity'] '**end**': ['index of the end of the corresponding entity'] '**category**': ['the broader category this entity belongs to'] """) st.subheader("Grouped Entities by Category", divider = "orange") # Create tabs for each category category_names = list(GLINER_LABELS_CATEGORIZED.keys()) category_tabs = st.tabs(category_names) for i, category_name in enumerate(category_names): with category_tabs[i]: # Filter the main DataFrame for the current category df_category_filtered = df[df['category'] == category_name] if not df_category_filtered.empty: # Sort entities within the category by their specific type for better display for entity_type in GLINER_LABELS_CATEGORIZED[category_name]: df_entity_type_filtered = df_category_filtered[df_category_filtered['entity_group'] == entity_type] if not df_entity_type_filtered.empty: st.markdown(f"***{entity_type}***") st.dataframe(df_entity_type_filtered.drop(columns=['category']), use_container_width=True) else: st.info(f"No '{entity_type}' entities found for this category.") else: st.info(f"No entities found for the '{category_name}' category.") st.divider() # --- Visualizations --- st.subheader("Tree map", divider="orange") # Update treemap path to include category fig_treemap = px.treemap(df, path=[px.Constant("all"), 'category', 'entity_group', 'word'], values='score', color='category') # Color by category for better visual distinction fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25)) st.plotly_chart(fig_treemap) if comet_initialized: experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap") value_counts1 = df['entity_group'].value_counts() final_df_counts = value_counts1.reset_index().rename(columns={"index": "entity_group", "count": "count"}) col1, col2 = st.columns(2) with col1: st.subheader("Pie Chart (by Entity Type)", divider="orange") fig_pie = px.pie(final_df_counts, values='count', names='entity_group', hover_data=['count'], labels={'count': 'count'}, title='Percentage of Predicted Labels (Entity Types)') fig_pie.update_traces(textposition='inside', textinfo='percent+label') st.plotly_chart(fig_pie) if comet_initialized: experiment.log_figure(figure=fig_pie, figure_name="label_pie_chart") with col2: st.subheader("Bar Chart (by Entity Type)", divider="orange") fig_bar = px.bar(final_df_counts, x="count", y="entity_group", color="entity_group", text_auto=True, title='Occurrences of Predicted Labels (Entity Types)', orientation='h') fig_bar.update_layout(yaxis={'categoryorder':'total ascending'}) # Order bars st.plotly_chart(fig_bar) if comet_initialized: experiment.log_figure(figure=fig_bar, figure_name="label_bar_chart") # Add a chart for categories st.subheader("Entity Counts by Category", divider="orange") category_counts = df['category'].value_counts().reset_index().rename(columns={"index": "category", "count": "count"}) fig_cat_bar = px.bar(category_counts, x="count", y="category", color="category", text_auto=True, title='Occurrences of Entities by Category', orientation='h') fig_cat_bar.update_layout(yaxis={'categoryorder':'total ascending'}) st.plotly_chart(fig_cat_bar) # --- Downloadable Content --- dfa = pd.DataFrame( data={ 'Column Name': ['word', 'entity_group', 'score', 'start', 'end', 'category'], 'Description': [ 'entity extracted from your text data', 'label (tag) assigned to a given extracted entity', 'accuracy score; how accurately a tag has been assigned to a given entity', 'index of the start of the corresponding entity', 'index of the end of the corresponding entity', 'the broader category this entity belongs to', ] } ) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as myzip: myzip.writestr("Summary of the results.csv", df.to_csv(index=False)) myzip.writestr("Glossary of tags.csv", dfa.to_csv(index=False)) with stylable_container( key="download_button", css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""", ): st.download_button( label="Download zip file", data=buf.getvalue(), file_name="nlpblogs_ner_results.zip", mime="application/zip", ) if comet_initialized: experiment.log_asset(buf.getvalue(), file_name="downloadable_results.zip") st.divider() if comet_initialized: experiment.end() end_time_overall = time.time() # End time for overall processing elapsed_time_overall = end_time_overall - start_time_overall st.info(f"Results processed in **{elapsed_time_overall:.2f} seconds**.") st.write(f"Number of times you requested results: **{st.session_state['file_upload_attempts']}/{max_attempts}**")