Spaces:
Running
Running
import streamlit as st | |
from predict import run_prediction | |
from io import StringIO | |
import PyPDF4 | |
import docx2txt | |
import pdfplumber | |
import difflib | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
# ========== CONFIG ========== | |
st.set_page_config(layout="wide", page_title="Contract Analysis Suite", page_icon="π") | |
# ========== SESSION STATE ========== | |
if 'comparison_results' not in st.session_state: | |
st.session_state.comparison_results = None | |
if 'analysis_results' not in st.session_state: | |
st.session_state.analysis_results = None | |
# ========== CACHED HELPERS ========== | |
def load_questions(): | |
try: | |
with open('data/questions.txt') as f: | |
return [q.strip() for q in f.readlines() if q.strip()] | |
except Exception as e: | |
st.error(f"Error loading questions: {str(e)}") | |
return [] | |
def load_questions_short(): | |
try: | |
with open('data/questions_short.txt') as f: | |
return [q.strip() for q in f.readlines() if q.strip()] | |
except Exception as e: | |
st.error(f"Error loading short questions: {str(e)}") | |
return [] | |
# ========== FILE PARSING ========== | |
def extract_text_from_pdf(uploaded_file): | |
try: | |
with pdfplumber.open(uploaded_file) as pdf: | |
full_text = "" | |
for page in pdf.pages: | |
try: | |
text = page.extract_text_formatted() | |
except AttributeError: | |
text = page.extract_text() | |
full_text += (text or "") + "\n\n" | |
return full_text.strip() | |
except Exception as e: | |
st.error(f"PDF extraction error: {str(e)}") | |
return "" | |
def load_contract(file): | |
if not file: | |
return "" | |
try: | |
ext = file.name.split('.')[-1].lower() | |
if ext == 'txt': | |
return StringIO(file.getvalue().decode("utf-8")).read().strip() | |
elif ext == 'pdf': | |
content = extract_text_from_pdf(file) | |
if not content: | |
pdfReader = PyPDF4.PdfFileReader(file) | |
return "\n\n".join([p.extractText() for p in pdfReader.pages]) | |
return content | |
elif ext == 'docx': | |
return docx2txt.process(file).strip() | |
else: | |
st.warning("Unsupported file type") | |
return "" | |
except Exception as e: | |
st.error(f"Error loading file: {str(e)}") | |
return "" | |
# ========== TEXT UTILS ========== | |
def highlight_differences_words(text1, text2): | |
differ = difflib.Differ() | |
diff = list(differ.compare(text1.split(), text2.split())) | |
h1, h2 = "", "" | |
for i, word in enumerate(diff): | |
if word.startswith("- "): | |
w = word[2:] | |
h1 += f'<span style="background-color:#ffcccc;">{w}</span> ' | |
if i+1 < len(diff) and diff[i+1].startswith("+ "): | |
h2 += f'<span style="background-color:#ffffcc;">{diff[i+1][2:]}</span> ' | |
diff[i+1] = ' ' | |
else: | |
h2 += " " | |
elif word.startswith("+ "): | |
w = word[2:] | |
h2 += f'<span style="background-color:#ccffcc;">{w}</span> ' | |
if i-1 >= 0 and diff[i-1].startswith("- "): | |
h1 += f'<span style="background-color:#ffffcc;">{diff[i-1][2:]}</span> ' | |
diff[i-1] = ' ' | |
else: | |
h1 += " " | |
elif word.startswith(" "): | |
w = word[2:] + " " | |
h1 += w | |
h2 += w | |
return h1.strip(), h2.strip() | |
def calculate_similarity(text1, text2): | |
if not text1.strip() or not text2.strip(): | |
return 0.0 | |
try: | |
vectorizer = TfidfVectorizer(token_pattern=r'(?u)\b\w+\b') | |
tfidf = vectorizer.fit_transform([text1, text2]) | |
sim = cosine_similarity(tfidf[0:1], tfidf[1:2]) | |
return sim[0][0] * 100 | |
except: | |
return difflib.SequenceMatcher(None, text1, text2).ratio() * 100 | |
# ========== MAIN APP ========== | |
def main(): | |
st.title("π Contract Analysis Suite") | |
st.markdown("Compare documents and analyze legal clauses using AI-powered tools.") | |
questions = load_questions() | |
questions_short = load_questions_short() | |
if not questions or not questions_short or len(questions) != len(questions_short): | |
st.error("Questions failed to load properly.") | |
return | |
st.header("1. Upload Documents") | |
col1, col2 = st.columns(2) | |
with col1: | |
file1 = st.file_uploader("Upload First Document", type=["txt", "pdf", "docx"], key="file1") | |
text1 = load_contract(file1) if file1 else "" | |
display1 = st.empty() | |
with col2: | |
file2 = st.file_uploader("Upload Second Document", type=["txt", "pdf", "docx"], key="file2") | |
text2 = load_contract(file2) if file2 else "" | |
display2 = st.empty() | |
if file1: | |
display1.text_area("Document 1 Content", value=text1, height=400, key="area1") | |
if file2: | |
display2.text_area("Document 2 Content", value=text2, height=400, key="area2") | |
if not (file1 and file2): | |
st.warning("Please upload both documents.") | |
return | |
st.header("2. Document Comparison") | |
with st.expander("Show Document Differences", expanded=True): | |
if st.button("Compare Documents"): | |
with st.spinner("Analyzing..."): | |
sim = calculate_similarity(text1, text2) | |
diff1, diff2 = highlight_differences_words(text1, text2) | |
st.session_state.comparison_results = { | |
'similarity': sim, | |
'diff1': diff1, | |
'diff2': diff2, | |
} | |
if st.session_state.comparison_results: | |
sim = st.session_state.comparison_results['similarity'] | |
st.metric("Document Similarity Score", f"{sim:.2f}%") | |
if sim >= 70: | |
st.markdown("### Visual Difference Highlighting") | |
sync_scroll_script = """ | |
<script> | |
const left = document.getElementById("left"); | |
const right = document.getElementById("right"); | |
left.onscroll = function() { | |
right.scrollTop = left.scrollTop; | |
}; | |
right.onscroll = function() { | |
left.scrollTop = right.scrollTop; | |
}; | |
</script> | |
""" | |
html = f""" | |
<div style="display: flex; gap: 20px;"> | |
<div id="left" style="width: 100%; height: 500px; overflow-y: auto; padding: 10px; font-family: monospace; border: 1px solid #ccc;"> | |
{st.session_state.comparison_results['diff1']} | |
</div> | |
<div id="right" style="width: 100%; height: 500px; overflow-y: auto; padding: 10px; font-family: monospace; border: 1px solid #ccc;"> | |
{st.session_state.comparison_results['diff2']} | |
</div> | |
</div> | |
{sync_scroll_script} | |
""" | |
st.markdown(html, unsafe_allow_html=True) | |
else: | |
st.warning("Similarity below 70%. Skipping visual diff display.") | |
# ========== CLAUSE ANALYSIS ========== | |
st.header("3. Clause Analysis") | |
try: | |
question_short = st.selectbox("Select a legal question to analyze:", questions_short) | |
idx = questions_short.index(question_short) | |
question = questions[idx] | |
except: | |
st.error("Error selecting question") | |
return | |
if st.button("Analyze Both Documents"): | |
if not (text1.strip() and text2.strip()): | |
st.error("Ensure both documents have content.") | |
return | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("First Document Analysis") | |
with st.spinner("Processing..."): | |
try: | |
ans1 = run_prediction([question], text1, 'marshmellow77/roberta-base-cuad', n_best_size=5).get('0', 'No answer') | |
st.session_state.analysis_results = st.session_state.analysis_results or {} | |
st.session_state.analysis_results['doc1'] = ans1 | |
except Exception as e: | |
st.session_state.analysis_results['doc1'] = f"Failed: {e}" | |
with col2: | |
st.subheader("Second Document Analysis") | |
with st.spinner("Processing..."): | |
try: | |
ans2 = run_prediction([question], text2, 'marshmellow77/roberta-base-cuad', n_best_size=5).get('0', 'No answer') | |
st.session_state.analysis_results = st.session_state.analysis_results or {} | |
st.session_state.analysis_results['doc2'] = ans2 | |
except Exception as e: | |
st.session_state.analysis_results['doc2'] = f"Failed: {e}" | |
if st.session_state.analysis_results: | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("First Document Result") | |
st.success(st.session_state.analysis_results.get('doc1', 'No analysis yet')) | |
with col2: | |
st.subheader("Second Document Result") | |
st.success(st.session_state.analysis_results.get('doc2', 'No analysis yet')) | |
if __name__ == "__main__": | |
main() | |