|
import streamlit as st |
|
import json |
|
import numpy as np |
|
import joblib |
|
from collections import defaultdict |
|
from transformers import AutoTokenizer, AutoModel |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import torch |
|
import re |
|
from typing import List, Dict, Any |
|
from openai import OpenAI |
|
from dotenv import load_dotenv |
|
import os |
|
from sentence_transformers import SentenceTransformer |
|
from rdflib import Graph, Namespace, URIRef, Literal, RDF, RDFS, XSD |
|
import os |
|
import networkx as nx |
|
from pyvis.network import Network |
|
import streamlit.components.v1 as components |
|
|
|
|
|
|
|
|
|
class GDPRComplianceChecker: |
|
def __init__(self, model_name="nlpaueb/bert-base-uncased-eurlex"): |
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
self.model = AutoModel.from_pretrained(model_name) |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self.model.to(self.device).eval() |
|
|
|
def get_embeddings(self, texts): |
|
embeddings = [] |
|
for text in texts: |
|
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512) |
|
inputs = {k: v.to(self.device) for k, v in inputs.items()} |
|
with torch.no_grad(): |
|
output = self.model(**inputs) |
|
embedding = output.last_hidden_state[:, 0, :].cpu().numpy() |
|
embeddings.append(embedding[0]) |
|
return np.array(embeddings) |
|
|
|
def chunk_policy_text(self, text, chunk_size=500): |
|
paragraphs = re.split(r'\n{2,}|\.\s+', text) |
|
chunks, current = [], "" |
|
for para in paragraphs: |
|
if len(current) + len(para) < chunk_size: |
|
current += " " + para |
|
else: |
|
chunks.append(current.strip()) |
|
current = para |
|
if current: |
|
chunks.append(current.strip()) |
|
return [chunk for chunk in chunks if len(chunk) > 50] |
|
|
|
def load_gdpr_articles(self, gdpr_json): |
|
gdpr_map, texts = {}, [] |
|
for article in gdpr_json: |
|
number, title = article["article_number"], article["article_title"] |
|
body = " ".join([f"{k} {v}" for sec in article["sections"] for k, v in sec.items()]) |
|
full_text = f"Article {number}: {title}. {body}" |
|
gdpr_map[number] = {"title": title, "text": full_text} |
|
texts.append(full_text) |
|
|
|
embeddings = self.get_embeddings(texts) |
|
return gdpr_map, embeddings |
|
|
|
def calculate_compliance_score(self, policy_text, gdpr_map, gdpr_embeddings): |
|
chunks = self.chunk_policy_text(policy_text) |
|
if not chunks: |
|
return {"error": "Policy has no meaningful chunks."} |
|
chunk_embeddings = self.get_embeddings(chunks) |
|
sim_matrix = cosine_similarity(gdpr_embeddings, chunk_embeddings) |
|
|
|
article_scores = {} |
|
presence_threshold = 0.35 |
|
total_score, counted_articles = 0, 0 |
|
|
|
for i, (art_num, art_data) in enumerate(gdpr_map.items()): |
|
max_sim = np.max(sim_matrix[i]) |
|
best_idx = np.argmax(sim_matrix[i]) |
|
|
|
if max_sim < presence_threshold: |
|
continue |
|
|
|
score_pct = min(100, max(0, (max_sim - presence_threshold) / (1 - presence_threshold) * 100)) |
|
article_scores[art_num] = { |
|
"article_title": art_data["title"], |
|
"compliance_percentage": round(score_pct, 2), |
|
"similarity_score": round(max_sim, 4), |
|
"matched_text_snippet": chunks[best_idx][:300] + "..." |
|
} |
|
total_score += score_pct |
|
counted_articles += 1 |
|
|
|
overall = round(total_score / counted_articles, 2) if counted_articles else 0 |
|
return { |
|
"overall_compliance_percentage": overall, |
|
"relevant_articles_analyzed": counted_articles, |
|
"total_policy_chunks": len(chunks), |
|
"article_scores": article_scores |
|
} |
|
|
|
|
|
def chunk_policy_text(text, chunk_size=500): |
|
import re |
|
paragraphs = re.split(r'\n{2,}|\.\s+', text) |
|
chunks, current = [], "" |
|
for para in paragraphs: |
|
if len(current) + len(para) < chunk_size: |
|
current += " " + para |
|
else: |
|
chunks.append(current.strip()) |
|
current = para |
|
if current: |
|
chunks.append(current.strip()) |
|
return [chunk for chunk in chunks if len(chunk) > 50] |
|
|
|
def prepare_article_text(article: Dict[str, Any]) -> str: |
|
body = " ".join( |
|
" ".join(sec.values()) if isinstance(sec, dict) else str(sec) |
|
for sec in article.get("sections", []) |
|
) |
|
return f"Art. {article['article_number']} β {article['article_title']} {body}" |
|
|
|
def get_embedding(text: str) -> List[float]: |
|
|
|
if isinstance(text, list): |
|
cleaned_text = [t.replace("\n", " ") for t in text] |
|
else: |
|
cleaned_text = text.replace("\n", " ") |
|
resp = client.embeddings.create(model=EMBED_MODEL, input=cleaned_text) |
|
if isinstance(cleaned_text, list): |
|
return [item.embedding for item in resp.data] |
|
else: |
|
return resp.data[0].embedding |
|
|
|
def rdflib_to_networkx(rdflib_graph): |
|
nx_graph = nx.MultiDiGraph() |
|
for s, p, o in rdflib_graph: |
|
nx_graph.add_edge(str(s), str(o), label=str(p)) |
|
return nx_graph |
|
|
|
def draw_pyvis_graph(nx_graph): |
|
net = Network(height="600px", width="100%", directed=True, notebook=False) |
|
net.from_nx(nx_graph) |
|
net.repulsion(node_distance=200, central_gravity=0.33, spring_length=100, spring_strength=0.10, damping=0.95) |
|
return net |
|
|
|
|
|
|
|
st.set_page_config(page_title="GDPR Compliance Checker", layout="wide") |
|
st.title("π‘οΈ GDPR Compliance Checker") |
|
|
|
with st.sidebar: |
|
st.header("Upload Files") |
|
gdpr_file = st.file_uploader("GDPR JSON File", type=["json"]) |
|
policy_file = st.file_uploader("Company Policy (.txt)", type=["txt"]) |
|
|
|
if gdpr_file and policy_file: |
|
model_choice = st.selectbox( |
|
"Choose the model to use:", |
|
["Logistic Regression", "MultinomialNB", "LegalBERT (Eurlex)", "SentenceTransformer", "LLM Model", "Knowledge Graphs"] |
|
) |
|
|
|
gdpr_data = json.load(gdpr_file) |
|
article_title_map = {f"Article {a['article_number']}": a['article_title'] for a in gdpr_data} |
|
|
|
policy_text = policy_file.read().decode("utf-8") |
|
|
|
with st.spinner("Analyzing..."): |
|
if model_choice == "LegalBERT (Eurlex)": |
|
checker = GDPRComplianceChecker() |
|
gdpr_map, gdpr_embeddings = checker.load_gdpr_articles(gdpr_data) |
|
result = checker.calculate_compliance_score(policy_text, gdpr_map, gdpr_embeddings) |
|
|
|
elif model_choice in ["Logistic Regression", "MultinomialNB"]: |
|
if model_choice == "Logistic Regression": |
|
model = joblib.load("logistic_regression_model.joblib") |
|
vectorizer = joblib.load("logistic_regression_vectorizer.joblib") |
|
else: |
|
model = joblib.load("multinomialNB_model.joblib") |
|
vectorizer = joblib.load("multinomialNB_vectorizer.joblib") |
|
|
|
chunks = chunk_policy_text(policy_text) |
|
chunks = [c.strip() for c in chunks if len(c.strip()) > 40] |
|
X_tfidf = vectorizer.transform(chunks) |
|
y_pred = model.predict(X_tfidf) |
|
y_proba = model.predict_proba(X_tfidf) |
|
|
|
article_scores = defaultdict(lambda: { |
|
"article_title": "", |
|
"compliance_percentage": 0.0, |
|
"similarity_score": 0.0, |
|
"matched_text_snippet": "" |
|
}) |
|
total_score = 0 |
|
counted_chunks = 0 |
|
|
|
for i, (label, prob_vector) in enumerate(zip(y_pred, y_proba)): |
|
max_prob = max(prob_vector) |
|
if max_prob >= 0.35: |
|
score_pct = min(100.0, max(0.0, (max_prob - 0.35) / (1 - 0.35) * 100)) |
|
if score_pct > article_scores[label]["compliance_percentage"]: |
|
article_scores[label]["compliance_percentage"] = score_pct |
|
article_scores[label]["similarity_score"] = round(max_prob, 4) |
|
article_scores[label]["matched_text_snippet"] = chunks[i][:300] + "..." |
|
article_scores[label]["article_title"] = article_title_map.get(label, label) |
|
total_score += score_pct |
|
counted_chunks += 1 |
|
|
|
overall = round(total_score / counted_chunks, 2) if counted_chunks else 0 |
|
result = { |
|
"overall_compliance_percentage": overall, |
|
"relevant_articles_analyzed": len(article_scores), |
|
"total_policy_chunks": len(chunks), |
|
"article_scores": dict(article_scores) |
|
} |
|
|
|
elif model_choice == "SentenceTransformer": |
|
model = joblib.load("sentence_transformer_model.joblib") |
|
gdpr_texts = [] |
|
gdpr_map = {} |
|
for article in gdpr_data: |
|
number, title = article["article_number"], article["article_title"] |
|
body = " ".join([f"{k} {v}" for sec in article["sections"] for k, v in sec.items()]) |
|
full_text = f"Article {number}: {title}. {body}" |
|
gdpr_map[number] = { |
|
"title": title, |
|
"text": full_text |
|
} |
|
gdpr_texts.append(full_text) |
|
|
|
gdpr_embeddings = model.encode(gdpr_texts, convert_to_numpy=True) |
|
|
|
chunks = chunk_policy_text(policy_text) |
|
chunk_embeddings = model.encode(chunks, convert_to_numpy=True) |
|
|
|
sim_matrix = cosine_similarity(gdpr_embeddings, chunk_embeddings) |
|
|
|
article_scores = {} |
|
presence_threshold = 0.35 |
|
total_score, counted_articles = 0, 0 |
|
|
|
for i, (art_num, art_data) in enumerate(gdpr_map.items()): |
|
max_sim = np.max(sim_matrix[i]) |
|
best_idx = np.argmax(sim_matrix[i]) |
|
|
|
if max_sim < presence_threshold: |
|
continue |
|
|
|
score_pct = min(100, max(0, (max_sim - presence_threshold) / (1 - presence_threshold) * 100)) |
|
article_scores[art_num] = { |
|
"article_title": art_data["title"], |
|
"compliance_percentage": round(score_pct, 2), |
|
"similarity_score": round(max_sim, 4), |
|
"matched_text_snippet": chunks[best_idx][:300] + "..." |
|
} |
|
total_score += score_pct |
|
counted_articles += 1 |
|
|
|
overall = round(total_score / counted_articles, 2) if counted_articles else 0 |
|
result = { |
|
"overall_compliance_percentage": overall, |
|
"relevant_articles_analyzed": counted_articles, |
|
"total_policy_chunks": len(chunks), |
|
"article_scores": article_scores |
|
} |
|
|
|
elif model_choice == "LLM Model": |
|
load_dotenv() |
|
api_key = os.getenv("OPENAI_API_KEY") |
|
client = OpenAI(api_key=api_key) |
|
EMBED_MODEL = "text-embedding-3-small" |
|
gdpr_embeddings = {} |
|
gdpr_map = {} |
|
for art in gdpr_data: |
|
number, title = art["article_number"], art["article_title"] |
|
art_text = prepare_article_text(art) |
|
gdpr_embeddings[art["article_number"]] = { |
|
"embedding": get_embedding(art_text), |
|
"title": art["article_title"] |
|
} |
|
gdpr_map[number] = {"title": title, "text": art_text} |
|
chunks = chunk_policy_text(policy_text) |
|
chunk_embeddings = get_embedding(chunks) |
|
gdpr_embedding_vectors = [v["embedding"] for v in gdpr_embeddings.values()] |
|
sim_matrix = cosine_similarity(gdpr_embedding_vectors, chunk_embeddings) |
|
|
|
article_scores = {} |
|
presence_threshold = 0.35 |
|
total_score, counted_articles = 0, 0 |
|
|
|
for i, (art_num, art_data) in enumerate(gdpr_map.items()): |
|
max_sim = np.max(sim_matrix[i]) |
|
best_idx = np.argmax(sim_matrix[i]) |
|
|
|
if max_sim < presence_threshold: |
|
continue |
|
|
|
score_pct = min(100, max(0, (max_sim - presence_threshold) / (1 - presence_threshold) * 100)) |
|
article_scores[art_num] = { |
|
"article_title": art_data["title"], |
|
"compliance_percentage": round(score_pct, 2), |
|
"similarity_score": round(max_sim, 4), |
|
"matched_text_snippet": chunks[best_idx][:300] + "..." |
|
} |
|
total_score += score_pct |
|
counted_articles += 1 |
|
|
|
overall = round(total_score / counted_articles, 2) if counted_articles else 0 |
|
result = { |
|
"overall_compliance_percentage": overall, |
|
"relevant_articles_analyzed": counted_articles, |
|
"total_policy_chunks": len(chunks), |
|
"article_scores": article_scores |
|
} |
|
elif model_choice == "Knowledge Graphs": |
|
EMBED_MODEL = "all-MiniLM-L6-v2" |
|
model = SentenceTransformer(EMBED_MODEL) |
|
TOP_N = 1 |
|
BASE_URI = "http://example.org/gdpr#" |
|
gdpr_embeddings = {} |
|
gdpr_map = {} |
|
for art in gdpr_data: |
|
number, title = art["article_number"], art["article_title"] |
|
art_text = prepare_article_text(art) |
|
gdpr_embeddings[art["article_number"]] = { |
|
"embedding": model.encode(art_text), |
|
"title": art["article_title"], |
|
"uri": URIRef(f"{BASE_URI}Article{art['article_number']}") |
|
} |
|
gdpr_map[number] = {"title": title, "text": art_text} |
|
g = Graph() |
|
EX = Namespace(BASE_URI) |
|
g.bind("ex", EX) |
|
|
|
|
|
for num, info in gdpr_embeddings.items(): |
|
g.add((info["uri"], RDF.type, EX.Article)) |
|
g.add((info["uri"], RDFS.label, Literal(f"Article {num}: {info['title']}"))) |
|
|
|
article_nums = list(gdpr_embeddings.keys()) |
|
article_vectors = np.array([gdpr_embeddings[num]["embedding"] for num in article_nums]) |
|
|
|
|
|
total_score = 0 |
|
counted_sections = 0 |
|
chunks = chunk_policy_text(policy_text) |
|
report = [] |
|
presence_threshold = 0.35 |
|
|
|
|
|
for idx, text in enumerate(chunks, start=1): |
|
if not text.strip(): |
|
continue |
|
|
|
|
|
sec_uri = URIRef(f"{BASE_URI}PolicySection{idx}") |
|
g.add((sec_uri, RDF.type, EX.PolicySection)) |
|
g.add((sec_uri, RDFS.label, Literal(f"Section {idx}"))) |
|
|
|
|
|
sec_emb = model.encode(text) |
|
|
|
|
|
sims = [] |
|
for i, art_num in enumerate(article_nums): |
|
art_emb = article_vectors[i] |
|
sim = cosine_similarity([sec_emb], [art_emb])[0][0] |
|
sims.append({ |
|
"article": art_num, |
|
"title": gdpr_embeddings[art_num]["title"], |
|
"similarity": round(sim, 4), |
|
"uri": gdpr_embeddings[art_num]["uri"], |
|
"text": gdpr_map[art_num]["text"] |
|
}) |
|
|
|
|
|
sims.sort(key=lambda x: x["similarity"], reverse=True) |
|
top_match = sims[0] |
|
|
|
|
|
if top_match["similarity"] < presence_threshold: |
|
continue |
|
|
|
|
|
score_pct = min(100, max(0, (top_match["similarity"] - presence_threshold) / (1 - presence_threshold) * 100)) |
|
|
|
|
|
g.add((sec_uri, EX.relatesTo, top_match["uri"])) |
|
g.add((sec_uri, EX.similarityScore, Literal(top_match["similarity"], datatype=XSD.float))) |
|
|
|
|
|
g.serialize(destination="gdpr_policy_graph.ttl", format="turtle") |
|
|
|
total_score += score_pct |
|
counted_sections += 1 |
|
|
|
|
|
overall = round(total_score / counted_sections, 2) if counted_sections else 0 |
|
result = { |
|
"overall_compliance_percentage": overall, |
|
"relevant_sections_analyzed": counted_sections, |
|
"total_policy_sections": len(chunks), |
|
"ttl": True |
|
} |
|
|
|
else: |
|
result = {} |
|
|
|
if result: |
|
st.subheader(f"β
Overall Compliance Score: {result['overall_compliance_percentage']}%") |
|
st.markdown("---") |
|
st.subheader("π Detailed Article Breakdown") |
|
ttl_file_path = "gdpr_policy_graph.ttl" |
|
if result.get('article_scores'): |
|
for art_num, data in sorted(result['article_scores'].items(), key=lambda x: -x[1]['compliance_percentage']): |
|
with st.expander(f"Article {art_num} - {data['article_title']} ({data['compliance_percentage']}%)"): |
|
st.write(f"**Similarity Score**: {data['similarity_score']}") |
|
st.write(f"**Matched Text**:\n\n{data['matched_text_snippet']}") |
|
elif result.get("ttl") and os.path.exists(ttl_file_path): |
|
st.markdown("---") |
|
st.subheader("π§ Interactive RDF Graph Visualization") |
|
|
|
g = Graph() |
|
g.parse(ttl_file_path, format="ttl") |
|
|
|
nx_graph = rdflib_to_networkx(g) |
|
net = draw_pyvis_graph(nx_graph) |
|
|
|
|
|
net.save_graph("rdf_graph.html") |
|
HtmlFile = open("rdf_graph.html", "r", encoding="utf-8").read() |
|
|
|
|
|
components.html(HtmlFile, height=650, scrolling=True) |
|
|
|
else: |
|
st.info("No article scores or RDF graph to display.") |
|
|
|
else: |
|
st.info("Please upload both a GDPR JSON file and a company policy text file to begin.") |
|
|