|
import gradio as gr |
|
import pandas as pd |
|
import os |
|
import tempfile |
|
from typing import List, Dict, Any |
|
import json |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.vectorstores import Chroma |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain.schema import Document |
|
from langchain.chains import RetrievalQA |
|
import logging |
|
import uuid |
|
import docx |
|
import PyPDF2 |
|
import openpyxl |
|
import pptx |
|
import shutil |
|
import re |
|
from transformers import pipeline |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
CHROMA_DB_DIR = "./chroma_db" |
|
|
|
class HFZeroGPULLM: |
|
def __init__(self, model_id="mistralai/Mistral-7B-Instruct-v0.1"): |
|
try: |
|
self.generator = pipeline("text-generation", model=model_id, device=-1) |
|
logger.info("Loaded HuggingFace text-generation pipeline on CPU.") |
|
except Exception as e: |
|
logger.error(f"Failed to load HuggingFace pipeline: {e}") |
|
self.generator = None |
|
|
|
def invoke(self, prompt): |
|
if not self.generator: |
|
raise RuntimeError("HFZeroGPULLM not initialized properly.") |
|
result = self.generator(prompt, max_new_tokens=512, do_sample=True)[0] |
|
return result['generated_text'] if 'generated_text' in result else result['text'] |
|
|
|
class CSVRAGSystem: |
|
def __init__(self): |
|
self.vectorstore = None |
|
self.qa_chain = None |
|
self.uploaded_files = [] |
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=1000, |
|
chunk_overlap=200, |
|
length_function=len, |
|
) |
|
|
|
|
|
try: |
|
self.llm = HFZeroGPULLM() |
|
logger.info("HuggingFace LLM initialized successfully.") |
|
except Exception as e: |
|
logger.error(f"Failed to initialize HuggingFace LLM: {e}") |
|
self.llm = None |
|
|
|
|
|
self.load_vectorstore() |
|
|
|
def load_vectorstore(self): |
|
try: |
|
if os.path.exists(CHROMA_DB_DIR) and os.listdir(CHROMA_DB_DIR): |
|
embeddings = HuggingFaceEmbeddings( |
|
model_name="sentence-transformers/all-MiniLM-L6-v2", |
|
model_kwargs={'device': 'cpu'} |
|
) |
|
self.vectorstore = Chroma( |
|
embedding_function=embeddings, |
|
persist_directory=CHROMA_DB_DIR |
|
) |
|
if self.llm: |
|
self.qa_chain = RetrievalQA.from_chain_type( |
|
llm=self.llm, |
|
chain_type="stuff", |
|
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}), |
|
return_source_documents=True |
|
) |
|
logger.info("Loaded persistent ChromaDB vectorstore.") |
|
else: |
|
logger.info("No existing ChromaDB found. Will create on first upload.") |
|
except Exception as e: |
|
logger.error(f"Error loading persistent ChromaDB: {e}") |
|
|
|
|
|
def csv_to_documents(self, csv_files: List[str]) -> List[Document]: |
|
"""Convert CSV files to LangChain documents""" |
|
documents = [] |
|
|
|
for file_path in csv_files: |
|
try: |
|
|
|
df = pd.read_csv(file_path) |
|
|
|
|
|
csv_text = f"File: {os.path.basename(file_path)}\n\n" |
|
csv_text += f"Columns: {', '.join(df.columns.tolist())}\n\n" |
|
csv_text += f"Number of rows: {len(df)}\n\n" |
|
|
|
|
|
csv_text += "Sample data:\n" |
|
csv_text += df.head(10).to_string(index=False) |
|
|
|
|
|
metadata = { |
|
"source": os.path.basename(file_path), |
|
"file_path": file_path, |
|
"columns_str": ", ".join(df.columns.tolist()), |
|
"rows": len(df) |
|
} |
|
|
|
doc = Document( |
|
page_content=csv_text, |
|
metadata=metadata |
|
) |
|
documents.append(doc) |
|
|
|
logger.info(f"Processed CSV file: {file_path}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing CSV file {file_path}: {e}") |
|
continue |
|
|
|
return documents |
|
|
|
def create_vectorstore(self, documents: List[Document]): |
|
"""Create or update persistent vector store from documents""" |
|
try: |
|
|
|
texts = self.text_splitter.split_documents(documents) |
|
logger.info(f"Split documents into {len(texts)} chunks") |
|
embeddings = HuggingFaceEmbeddings( |
|
model_name="sentence-transformers/all-MiniLM-L6-v2", |
|
model_kwargs={'device': 'cpu'} |
|
) |
|
|
|
if self.vectorstore is not None: |
|
self.vectorstore.add_documents(texts) |
|
self.vectorstore.persist() |
|
logger.info("Added new documents to existing ChromaDB and persisted.") |
|
else: |
|
self.vectorstore = Chroma.from_documents( |
|
documents=texts, |
|
embedding=embeddings, |
|
persist_directory=CHROMA_DB_DIR |
|
) |
|
logger.info("Created new persistent ChromaDB vectorstore.") |
|
|
|
if self.llm: |
|
self.qa_chain = RetrievalQA.from_chain_type( |
|
llm=self.llm, |
|
chain_type="stuff", |
|
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}), |
|
return_source_documents=True |
|
) |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error creating/updating vector store: {e}") |
|
return False |
|
|
|
def query(self, question: str) -> str: |
|
"""Query the RAG system""" |
|
if not self.qa_chain: |
|
return "Error: RAG system not initialized. Please upload CSV files first." |
|
|
|
try: |
|
result = self.qa_chain({"query": question}) |
|
answer = result["result"] |
|
sources = result.get("source_documents", []) |
|
|
|
|
|
response = f"Answer: {answer}\n\n" |
|
if sources: |
|
response += "Sources:\n" |
|
for i, source in enumerate(sources, 1): |
|
response += f"{i}. {source.metadata.get('source', 'Unknown')}\n" |
|
|
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f"Error querying RAG system: {e}") |
|
return f"Error processing query: {str(e)}" |
|
|
|
|
|
rag_system = CSVRAGSystem() |
|
|
|
|
|
def extract_text_from_file(file_path): |
|
ext = os.path.splitext(file_path)[1].lower() |
|
try: |
|
if ext == ".csv": |
|
df = pd.read_csv(file_path) |
|
text = df.to_string(index=False) |
|
meta = {"columns": list(df.columns), "rows": len(df)} |
|
return text, meta |
|
elif ext in [".xls", ".xlsx"]: |
|
df = pd.read_excel(file_path) |
|
text = df.to_string(index=False) |
|
meta = {"columns": list(df.columns), "rows": len(df)} |
|
return text, meta |
|
elif ext == ".docx": |
|
doc = docx.Document(file_path) |
|
text = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) |
|
return text, {} |
|
elif ext == ".pdf": |
|
with open(file_path, "rb") as f: |
|
reader = PyPDF2.PdfReader(f) |
|
text = "\n".join(page.extract_text() or "" for page in reader.pages) |
|
return text, {} |
|
elif ext == ".pptx": |
|
prs = pptx.Presentation(file_path) |
|
slides = [] |
|
for i, slide in enumerate(prs.slides): |
|
slide_text = [] |
|
for shape in slide.shapes: |
|
if hasattr(shape, "text"): |
|
slide_text.append(shape.text) |
|
slides.append(f"Slide {i+1}:\n" + "\n".join(slide_text)) |
|
text = "\n\n".join(slides) |
|
return text, {"slides": len(prs.slides)} |
|
elif ext == ".txt": |
|
with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
|
text = f.read() |
|
return text, {} |
|
elif ext in [".doc", ".ppt"]: |
|
return None, {"warning": "Legacy DOC/PPT not supported. Please use DOCX/PPTX."} |
|
else: |
|
return None, {"warning": f"Unsupported file type: {ext}"} |
|
except Exception as e: |
|
logger.error(f"Error extracting text from {file_path}: {e}") |
|
return None, {"error": str(e)} |
|
|
|
def csv_to_documents(files: list) -> list: |
|
"""Convert uploaded files (various formats) to LangChain documents""" |
|
documents = [] |
|
for file_path in files: |
|
text, meta = extract_text_from_file(file_path) |
|
if text: |
|
metadata = { |
|
"source": os.path.basename(file_path), |
|
"file_path": file_path, |
|
} |
|
metadata.update(meta) |
|
doc = Document( |
|
page_content=text, |
|
metadata=metadata |
|
) |
|
documents.append(doc) |
|
elif meta.get("warning"): |
|
logger.warning(meta["warning"]) |
|
elif meta.get("error"): |
|
logger.error(meta["error"]) |
|
return documents |
|
|
|
|
|
rag_system.csv_to_documents = csv_to_documents |
|
|
|
def process_csv_files(files): |
|
"""Process uploaded CSV files""" |
|
if not files: |
|
return "Please upload CSV files first.", [] |
|
|
|
try: |
|
|
|
documents = rag_system.csv_to_documents(files) |
|
|
|
if not documents: |
|
return "No valid CSV files found.", [] |
|
|
|
|
|
success = rag_system.create_vectorstore(documents) |
|
|
|
rag_system.load_vectorstore() |
|
|
|
if rag_system.vectorstore is not None: |
|
try: |
|
chroma_contents = rag_system.vectorstore.get(ids=None, include=["metadatas"]) |
|
logger.info(f"ChromaDB now contains {len(chroma_contents.get('metadatas', []))} documents. Example: {chroma_contents.get('metadatas', [])[:1]}") |
|
except Exception as e: |
|
logger.error(f"Error logging ChromaDB contents: {e}") |
|
if success: |
|
file_info = [] |
|
for doc in documents: |
|
|
|
columns_str = doc.metadata.get("columns_str", "") |
|
columns = [col.strip() for col in columns_str.split(",")] if columns_str else [] |
|
|
|
file_info.append({ |
|
"filename": doc.metadata.get("source", "Unknown"), |
|
"columns": columns, |
|
"rows": doc.metadata.get("rows", 0) |
|
}) |
|
|
|
return f"Successfully processed {len(documents)} files. You can now ask questions!", file_info |
|
else: |
|
return "Error creating vector store. Please check the logs.", [] |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing files: {e}") |
|
return f"Error processing files: {str(e)}", [] |
|
|
|
def ask_question(question): |
|
"""Ask a question to the RAG system""" |
|
if not question.strip(): |
|
return "Please enter a question." |
|
|
|
return rag_system.query(question) |
|
|
|
|
|
def analyze_case_details(case_summary, arguments, evidence, csv_files): |
|
"""Use Ollama LLM to analyze case details and CSVs, generate prompts/checklist for missing info, unsubstantiated claims, etc.""" |
|
if not rag_system.llm: |
|
return ["Ollama LLM not available. Please check your setup."] |
|
|
|
prompt = f""" |
|
You are an expert investigation report assistant. Given the following case summary, arguments, and evidence, generate a checklist of questions or prompts to help the investigation officer identify missing or unclear information, unsubstantiated claims, or areas needing more evidence. Be as detailed and exhaustive as possible. List each prompt as a separate bullet point. Do not omit any relevant aspect. |
|
|
|
Case Summary: |
|
{case_summary} |
|
|
|
Arguments/Claims: |
|
{arguments} |
|
|
|
Evidence: |
|
{evidence} |
|
""" |
|
try: |
|
response = rag_system.llm.invoke(prompt) |
|
|
|
prompts = [line.lstrip('-•* ').strip() for line in response.split('\n') if line.strip()] |
|
return prompts |
|
except Exception as e: |
|
logger.error(f"Ollama LLM error in analyze_case_details: {e}") |
|
return ["Error generating prompts with LLM."] |
|
|
|
def find_similar_cases(case_summary, arguments, evidence, vectorstore): |
|
"""Query ChromaDB for similar cases based on the current case details.""" |
|
|
|
if not vectorstore: |
|
return [] |
|
query_text = f"Case: {case_summary}\nArguments: {arguments}\nEvidence: {evidence}" |
|
try: |
|
results = vectorstore.similarity_search(query_text, k=3) |
|
similar = [ |
|
{ |
|
"summary": doc.page_content[:200], |
|
"source": doc.metadata.get("source", "Unknown") |
|
} for doc in results |
|
] |
|
return similar |
|
except Exception as e: |
|
logger.error(f"Error finding similar cases: {e}") |
|
return [] |
|
|
|
def generate_recommendations(similar_cases, prompts, io_responses, case_summary=None, arguments=None, evidence=None): |
|
"""Use Ollama LLM to synthesize recommendations/conclusions based on similar cases and IO responses, with detailed context.""" |
|
if not rag_system.llm: |
|
return ["Ollama LLM not available. Please check your setup."] |
|
|
|
prompt = f""" |
|
You are an expert investigation report assistant. Given the following case details and context from similar past cases, generate a detailed, actionable, and exhaustive list of recommendations or conclusions for the investigation report. Each recommendation should be specific, reference relevant context, and help the officer improve the report. Do not omit any relevant recommendation. |
|
|
|
Current Case: |
|
Case Summary: {case_summary or ''} |
|
Arguments/Claims: {arguments or ''} |
|
Evidence: {evidence or ''} |
|
|
|
Similar Past Cases (with context): |
|
""" |
|
for i, case in enumerate(similar_cases, 1): |
|
|
|
content = case.get("summary", "") |
|
findings = "" |
|
for section in ["Findings:", "Conclusion:", "Recommendations:"]: |
|
if section in content: |
|
findings = content.split(section, 1)[-1].split("\n", 1)[0][:300] |
|
break |
|
if not findings: |
|
findings = content[:500] |
|
prompt += f"\nCase {i} (Source: {case.get('source', 'Unknown')}):\n{findings}\n" |
|
prompt += "\nGenerate a list of recommendations or conclusions for the investigation report. Each should be a separate bullet point, detailed, exhaustive, and reference the context above where relevant." |
|
try: |
|
response = rag_system.llm.invoke(prompt) |
|
recs = [line.lstrip('-•* ').strip() for line in response.split('\n') if line.strip()] |
|
return recs |
|
except Exception as e: |
|
logger.error(f"Ollama LLM error in generate_recommendations: {e}") |
|
return ["Error generating recommendations with LLM."] |
|
|
|
def get_sample_report_from_vectorstore(): |
|
|
|
if rag_system.vectorstore is not None: |
|
try: |
|
docs = rag_system.vectorstore.get(ids=None, include=["metadatas", "documents"]) |
|
for meta, content in zip(docs.get('metadatas', []), docs.get('documents', [])): |
|
|
|
filename = meta.get('source', '').lower() if isinstance(meta, dict) else '' |
|
if 'report' in filename or (content and 'investigation report' in content.lower()): |
|
|
|
return content[:1000] |
|
|
|
if docs.get('documents'): |
|
return docs['documents'][0][:1000] |
|
except Exception as e: |
|
logger.error(f"Error extracting sample report from vectorstore: {e}") |
|
return None |
|
|
|
def generate_report(case_summary, arguments, evidence, io_responses, recommendations, similar_cases=None): |
|
"""Generate a draft investigation report using the LLM, referencing past precedents and matching previous report style.""" |
|
if not rag_system.llm: |
|
return "Ollama LLM not available. Please check your setup." |
|
sample_report = get_sample_report_from_vectorstore() |
|
prompt = f""" |
|
You are an expert investigation report writer. Using the following case details, AI prompts, recommendations, and context from similar past cases, generate a complete, professional, and well-structured investigation report. |
|
|
|
**IMPORTANT:** |
|
- Strictly follow the format, structure, and style of the sample report provided below (if available). Do NOT invent new sections or change the order/headers. Only add more detail, depth, analysis, and completeness within the boundaries of the sample report's format. |
|
- If no sample report is provided, use a standard investigation report format (Title, Case Summary, Arguments/Claims, Evidence, Checklist/Prompts, Recommendations/Conclusions, and any other relevant sections), but be as detailed and analytical as possible. |
|
- Reference relevant past cases (precedents) from the provided context where appropriate (e.g., "As in Case 1, ..."). |
|
- Clearly cite or mention the source/case number when referencing past cases. |
|
- Be actionable, formal, and as detailed and comprehensive as possible. Do not omit any relevant section or point. |
|
""" |
|
if sample_report: |
|
prompt += f"\n\nBelow is an example of a previously accepted investigation report. You MUST match the formatting, section headers, and style as closely as possible in your generated report. Only add more detail and analysis, do not change the structure.\n\n--- Sample Report Start ---\n{sample_report}\n--- Sample Report End ---\n" |
|
prompt += f"\nCase Summary:\n{case_summary}\n\nArguments/Claims:\n{arguments}\n\nEvidence:\n{evidence}\n\nAI Prompts/Checklist:\n" |
|
if isinstance(io_responses, dict) and io_responses: |
|
for k, v in io_responses.items(): |
|
prompt += f"- {k}: {v}\n" |
|
elif isinstance(io_responses, list) and io_responses: |
|
for p in io_responses: |
|
prompt += f"- {p}\n" |
|
prompt += "\nRecommendations/Conclusions:\n" |
|
if isinstance(recommendations, list): |
|
for r in recommendations: |
|
prompt += f"- {r}\n" |
|
else: |
|
prompt += str(recommendations) |
|
if similar_cases: |
|
prompt += "\n\nRelevant Past Cases (Precedents):\n" |
|
for i, case in enumerate(similar_cases, 1): |
|
content = case.get("summary", "") |
|
findings = "" |
|
for section in ["Findings:", "Conclusion:", "Recommendations:"]: |
|
if section in content: |
|
findings = content.split(section, 1)[-1].split("\n", 1)[0][:300] |
|
break |
|
if not findings: |
|
findings = content[:500] |
|
prompt += f"\nCase {i} (Source: {case.get('source', 'Unknown')}):\n{findings}\n" |
|
prompt += "\n\nWrite the report in a clear, formal style, and reference past cases where relevant. Be as detailed, analytical, and exhaustive as possible, but do NOT deviate from the sample report's format if provided." |
|
try: |
|
report = rag_system.llm.invoke(prompt) |
|
return report |
|
except Exception as e: |
|
logger.error(f"Ollama LLM error in generate_report: {e}") |
|
return "Error generating report with LLM." |
|
|
|
|
|
def export_report_file(report_text, filetype="txt"): |
|
"""Save the report to a temporary file and return the path for download.""" |
|
ext = ".pdf" if filetype == "pdf" else ".txt" |
|
filename = f"investigation_report_{uuid.uuid4().hex[:8]}{ext}" |
|
temp_dir = tempfile.gettempdir() |
|
file_path = os.path.join(temp_dir, filename) |
|
if filetype == "pdf": |
|
try: |
|
from fpdf import FPDF |
|
pdf = FPDF() |
|
pdf.add_page() |
|
pdf.set_auto_page_break(auto=True, margin=15) |
|
pdf.set_font("Arial", size=12) |
|
for line in report_text.split('\n'): |
|
pdf.cell(0, 10, txt=line, ln=1) |
|
pdf.output(file_path) |
|
except Exception as e: |
|
logger.error(f"PDF export error: {e}") |
|
|
|
with open(file_path, "w", encoding="utf-8") as f: |
|
f.write(report_text) |
|
else: |
|
with open(file_path, "w", encoding="utf-8") as f: |
|
f.write(report_text) |
|
return file_path |
|
|
|
|
|
|
|
def list_chroma_files(): |
|
files = [] |
|
if rag_system.vectorstore is not None: |
|
try: |
|
docs = rag_system.vectorstore.get(ids=None, include=["metadatas"]) |
|
for i, meta in enumerate(docs.get('metadatas', [])): |
|
|
|
files.append({ |
|
'index': i, |
|
'filename': meta.get('source', meta.get('file_path', 'Unknown')) if isinstance(meta, dict) else 'Unknown', |
|
'file_path': meta.get('file_path', '') if isinstance(meta, dict) else '', |
|
'columns': meta.get('columns', []) if isinstance(meta, dict) else '', |
|
'rows': meta.get('rows', '') if isinstance(meta, dict) else '', |
|
'slides': meta.get('slides', '') if isinstance(meta, dict) else '', |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error listing ChromaDB files: {e}") |
|
return files |
|
|
|
|
|
def delete_chroma_file(index): |
|
try: |
|
|
|
if rag_system.vectorstore is not None: |
|
ids = rag_system.vectorstore.get(ids=None)['ids'] |
|
if 0 <= index < len(ids): |
|
doc_id = ids[index] |
|
rag_system.vectorstore.delete([doc_id]) |
|
rag_system.vectorstore.persist() |
|
logger.info(f"Deleted document {doc_id} from ChromaDB.") |
|
|
|
files = list_chroma_files() |
|
if 0 <= index < len(files): |
|
file_path = files[index]['file_path'] |
|
if file_path and os.path.exists(file_path): |
|
os.remove(file_path) |
|
logger.info(f"Deleted file from disk: {file_path}") |
|
|
|
rag_system.load_vectorstore() |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error deleting file: {e}") |
|
return False |
|
|
|
with gr.Blocks(title="CSV Multi-Document RAG with Investigation Report Companion", theme=gr.themes.Soft()) as demo: |
|
with gr.Row(): |
|
with gr.Column(scale=1, min_width=250): |
|
gr.Markdown(""" |
|
# 🕵️♂️ Investigation Report Companion |
|
|
|
## How to Use |
|
1. **Upload Data Files** (CSV, Excel, Word, PDF, PowerPoint, or Text) |
|
2. **Enter or Upload Case Details** (manual or DOCX/PDF) |
|
3. **Review AI Prompts & Recommendations** |
|
4. **Generate & Export Report** |
|
|
|
--- |
|
**Tips:** |
|
- Hover over any field for more info. |
|
- You can reset the workflow at any time. |
|
- All steps are saved as you go. |
|
|
|
--- |
|
**About:** |
|
- Powered by Ollama LLM & ChromaDB |
|
- For professional investigation report writing |
|
""") |
|
reset_btn = gr.Button("🔄 Reset All", variant="secondary") |
|
with gr.Column(scale=4): |
|
|
|
step_indicator = gr.Markdown(""" |
|
**Step 1 of 4: Upload Data Files** |
|
""") |
|
with gr.Tabs() as main_tabs: |
|
with gr.TabItem("1️⃣ Data Upload"): |
|
gr.Markdown("## 📁 Upload Data Files") |
|
gr.Markdown("**Accepted formats:** CSV, Excel, Word, PDF, PowerPoint, or Text. You can upload multiple files at once.") |
|
file_input = gr.File( |
|
file_count="multiple", |
|
file_types=[".csv", ".xls", ".xlsx", ".docx", ".pdf", ".pptx", ".txt", ".doc", ".ppt"], |
|
label="Upload Data Files" |
|
) |
|
process_btn = gr.Button("Process Files", variant="primary") |
|
file_info_output = gr.JSON(label="Processed Files", visible=True) |
|
status_output = gr.Textbox( |
|
label="Status", |
|
interactive=False, |
|
lines=2 |
|
) |
|
with gr.TabItem("2️⃣ Case Details"): |
|
gr.Markdown("## 📝 Enter or Upload Case Details") |
|
gr.Markdown("You can either manually enter details or upload a DOCX/PDF file to auto-fill.\n_Tip: Use the autofill button to extract details from your document._") |
|
case_details_file = gr.File( |
|
file_count="single", |
|
file_types=[".docx", ".pdf"], |
|
label="Upload Case Details (DOCX or PDF)" |
|
) |
|
autofill_btn = gr.Button("Extract & Autofill from File", variant="secondary") |
|
case_summary_input = gr.Textbox(label="Case Summary", lines=3, placeholder="e.g. Theft of equipment from storage room on 12 March 2024. Suspect: John Doe.", info="Summarize the case background and context.") |
|
arguments_input = gr.Textbox(label="Arguments/Claims", lines=3, placeholder="e.g. The suspect was last seen near the storage room. No forced entry detected.", info="List the main arguments or claims.") |
|
evidence_input = gr.Textbox(label="Evidence (describe or paste)", lines=3, placeholder="e.g. CCTV footage, witness statements, inventory logs.", info="Describe or paste key evidence.") |
|
|
|
feedback_case_details = gr.Textbox(label="Feedback for Autofill (optional)", lines=2, placeholder="e.g. Focus on financial evidence.") |
|
regenerate_case_btn = gr.Button("Regenerate Autofill with Feedback", variant="secondary") |
|
|
|
analyze_btn = gr.Button("Analyze Case Details", variant="primary") |
|
with gr.TabItem("3️⃣ AI Prompts & Recommendations"): |
|
gr.Markdown("## 🤖 AI Prompts / Checklist") |
|
gr.Markdown("_Review the AI's checklist. These help ensure your report is complete._") |
|
prompts_output = gr.Textbox(label="AI Prompts/Checklist", lines=8, interactive=False) |
|
|
|
gr.Markdown("## 🧠 Case-Based Recommendations") |
|
gr.Markdown("_AI-generated recommendations based on your case and similar past cases._") |
|
recommendations_output = gr.Textbox(label="Case-Based Recommendations", lines=8, interactive=False) |
|
|
|
feedback_prompts = gr.Textbox(label="Feedback for Prompts/Recommendations (optional)", lines=2, placeholder="e.g. Suggest more actionable recommendations.") |
|
regenerate_prompts_btn = gr.Button("Regenerate Prompts & Recommendations with Feedback", variant="secondary") |
|
with gr.TabItem("4️⃣ Report Generation & Export"): |
|
gr.Markdown("## 📄 Investigation Report Preview") |
|
gr.Markdown("_Review your draft report below. You can go back and edit previous steps if needed._") |
|
report_output = gr.Textbox(label="Draft Report", lines=15, interactive=False) |
|
generate_report_btn = gr.Button("Generate Report", variant="primary") |
|
gr.Markdown("### ⬇️ Export Report") |
|
with gr.Row(): |
|
export_txt_btn = gr.Button("Export as TXT", variant="secondary") |
|
export_pdf_btn = gr.Button("Export as PDF", variant="secondary") |
|
download_file = gr.File(label="Download Report File") |
|
gr.Markdown("---") |
|
gr.Markdown("**Tip:** You can always go back to previous tabs to update your data or responses.") |
|
|
|
feedback_report = gr.Textbox(label="Feedback for Report (optional)", lines=2, placeholder="e.g. Make the report more concise.") |
|
regenerate_report_btn = gr.Button("Regenerate Report with Feedback", variant="secondary") |
|
with gr.TabItem("5️⃣ Manage Uploaded Files"): |
|
gr.Markdown("## 📂 Manage Uploaded Files") |
|
gr.Markdown("View all files in the persistent database. Select and delete files as needed.") |
|
files_list = gr.Dataframe( |
|
headers=["Index", "Filename", "Columns", "Rows", "Slides"], |
|
datatype=["number", "str", "str", "str", "str"], |
|
label="Files in Database", |
|
interactive=False |
|
) |
|
delete_index = gr.Number(label="Index to Delete", precision=0) |
|
delete_btn = gr.Button("Delete Selected File", variant="stop") |
|
delete_status = gr.Textbox(label="Delete Status", interactive=False) |
|
|
|
gr.Markdown("---") |
|
gr.Markdown("## ❓ Ask Questions about Your Data") |
|
gr.Markdown("_Ask the AI about your uploaded data at any time._") |
|
question_input = gr.Textbox( |
|
label="Enter your question about the CSV data", |
|
placeholder="e.g. What are the main trends in the data? What is the average value of column X?", |
|
lines=3, |
|
info="Type your question and click 'Ask Question'." |
|
) |
|
ask_btn = gr.Button("Ask Question", variant="primary") |
|
answer_output = gr.Textbox( |
|
label="RAG Response", |
|
lines=10, |
|
interactive=False |
|
) |
|
|
|
|
|
def reset_all(): |
|
return ( |
|
None, None, "", "", "", "", "", "", "", "", "" |
|
) |
|
reset_btn.click( |
|
fn=reset_all, |
|
inputs=[], |
|
outputs=[file_input, file_info_output, status_output, case_details_file, case_summary_input, arguments_input, evidence_input, prompts_output, recommendations_output, report_output, download_file] |
|
) |
|
|
|
process_btn.click( |
|
fn=process_csv_files, |
|
inputs=[file_input], |
|
outputs=[status_output, file_info_output] |
|
) |
|
|
|
def autofill_case_details(file): |
|
if not file: |
|
return "", "", "" |
|
ext = os.path.splitext(file)[1].lower() |
|
text, _ = extract_text_from_file(file) |
|
if not text: |
|
return "", "", "" |
|
|
|
if rag_system.llm: |
|
prompt = f""" |
|
You are an expert investigation assistant. Given the following document, extract and summarize: |
|
### Case Summary |
|
A concise, detailed, and comprehensive overview of the case background and context. Do not omit any relevant facts. |
|
### Arguments/Claims |
|
List all key points, claims, or arguments made. Be exhaustive and do not miss any argument. |
|
### Evidence |
|
List or describe all main pieces of evidence mentioned. Be as detailed and complete as possible. |
|
|
|
Document: |
|
{text} |
|
|
|
Format your response as: |
|
### Case Summary |
|
<summary here> |
|
### Arguments/Claims |
|
<arguments here> |
|
### Evidence |
|
<evidence here> |
|
""" |
|
try: |
|
llm_response = rag_system.llm.invoke(prompt) |
|
summary_match = re.search(r"### Case Summary\s*(.*?)\s*### Arguments/Claims", llm_response, re.DOTALL|re.IGNORECASE) |
|
arguments_match = re.search(r"### Arguments/Claims\s*(.*?)\s*### Evidence", llm_response, re.DOTALL|re.IGNORECASE) |
|
evidence_match = re.search(r"### Evidence\s*(.*)", llm_response, re.DOTALL|re.IGNORECASE) |
|
summary = summary_match.group(1).strip() if summary_match else "" |
|
arguments = arguments_match.group(1).strip() if arguments_match else "" |
|
evidence = evidence_match.group(1).strip() if evidence_match else "" |
|
return summary, arguments, evidence |
|
except Exception as e: |
|
logger.error(f"LLM autofill error: {e}") |
|
return "", "", "" |
|
|
|
parts = text.split("\n\n") |
|
summary = parts[0].strip() if len(parts) > 0 else text[:300] |
|
arguments = parts[1].strip() if len(parts) > 1 else "" |
|
evidence = parts[2].strip() if len(parts) > 2 else "" |
|
return summary, arguments, evidence |
|
autofill_btn.click( |
|
fn=autofill_case_details, |
|
inputs=[case_details_file], |
|
outputs=[case_summary_input, arguments_input, evidence_input] |
|
) |
|
|
|
|
|
def autofill_case_details_with_feedback(file, feedback): |
|
if not file: |
|
return "", "", "" |
|
ext = os.path.splitext(file)[1].lower() |
|
text, _ = extract_text_from_file(file) |
|
if not text: |
|
return "", "", "" |
|
if rag_system.llm: |
|
prompt = f""" |
|
You are an expert investigation assistant. Given the following document, extract and summarize: |
|
### Case Summary |
|
A concise, detailed, and comprehensive overview of the case background and context. Do not omit any relevant facts. |
|
### Arguments/Claims |
|
List all key points, claims, or arguments made. Be exhaustive and do not miss any argument. |
|
### Evidence |
|
List or describe all main pieces of evidence mentioned. Be as detailed and complete as possible. |
|
|
|
Document: |
|
{text} |
|
|
|
Additional user feedback/instructions: {feedback} |
|
|
|
Format your response as: |
|
### Case Summary |
|
<summary here> |
|
### Arguments/Claims |
|
<arguments here> |
|
### Evidence |
|
<evidence here> |
|
""" |
|
try: |
|
llm_response = rag_system.llm.invoke(prompt) |
|
summary_match = re.search(r"### Case Summary\s*(.*?)\s*### Arguments/Claims", llm_response, re.DOTALL|re.IGNORECASE) |
|
arguments_match = re.search(r"### Arguments/Claims\s*(.*?)\s*### Evidence", llm_response, re.DOTALL|re.IGNORECASE) |
|
evidence_match = re.search(r"### Evidence\s*(.*)", llm_response, re.DOTALL|re.IGNORECASE) |
|
summary = summary_match.group(1).strip() if summary_match else "" |
|
arguments = arguments_match.group(1).strip() if arguments_match else "" |
|
evidence = evidence_match.group(1).strip() if evidence_match else "" |
|
return summary, arguments, evidence |
|
except Exception as e: |
|
logger.error(f"LLM autofill error: {e}") |
|
return "", "", "" |
|
return "", "", "" |
|
regenerate_case_btn.click( |
|
fn=autofill_case_details_with_feedback, |
|
inputs=[case_details_file, feedback_case_details], |
|
outputs=[case_summary_input, arguments_input, evidence_input] |
|
) |
|
|
|
def analyze_case_details_gradio(case_summary, arguments, evidence, file_info): |
|
prompts = analyze_case_details(case_summary, arguments, evidence, file_info) |
|
similar_cases = find_similar_cases(case_summary, arguments, evidence, rag_system.vectorstore) |
|
prompts_text = "\n\n".join([f"- {p}" for p in prompts]) if isinstance(prompts, list) else str(prompts) |
|
recommendations = generate_recommendations(similar_cases, prompts, {}, case_summary, arguments, evidence) |
|
recommendations_text = "\n\n".join([f"- {r}" for r in recommendations]) if isinstance(recommendations, list) else str(recommendations) |
|
return prompts_text, recommendations_text |
|
|
|
analyze_btn.click( |
|
fn=analyze_case_details_gradio, |
|
inputs=[case_summary_input, arguments_input, evidence_input, file_info_output], |
|
outputs=[prompts_output, recommendations_output] |
|
) |
|
|
|
|
|
def analyze_case_details_gradio_with_feedback(case_summary, arguments, evidence, file_info, feedback): |
|
prompts = analyze_case_details(case_summary, arguments, evidence, file_info) |
|
similar_cases = find_similar_cases(case_summary, arguments, evidence, rag_system.vectorstore) |
|
if feedback: |
|
prompts = [f"(User feedback: {feedback})"] + prompts |
|
prompts_text = "\n\n".join([f"- {p}" for p in prompts]) if isinstance(prompts, list) else str(prompts) |
|
recommendations = generate_recommendations(similar_cases, prompts, {}, case_summary, arguments, evidence) |
|
recommendations_text = "\n\n".join([f"- {r}" for r in recommendations]) if isinstance(recommendations, list) else str(recommendations) |
|
return prompts_text, recommendations_text |
|
regenerate_prompts_btn.click( |
|
fn=analyze_case_details_gradio_with_feedback, |
|
inputs=[case_summary_input, arguments_input, evidence_input, file_info_output, feedback_prompts], |
|
outputs=[prompts_output, recommendations_output] |
|
) |
|
|
|
def generate_report_gradio(case_summary, arguments, evidence, recommendations): |
|
io_dict = {} |
|
|
|
recs = [r.strip('-•* ').strip() for r in recommendations.split('\n') if r.strip()] if isinstance(recommendations, str) else recommendations |
|
|
|
similar_cases = find_similar_cases(case_summary, arguments, evidence, rag_system.vectorstore) |
|
return generate_report(case_summary, arguments, evidence, io_dict, recs, similar_cases) |
|
|
|
generate_report_btn.click( |
|
fn=generate_report_gradio, |
|
inputs=[case_summary_input, arguments_input, evidence_input, recommendations_output], |
|
outputs=[report_output] |
|
) |
|
|
|
|
|
def generate_report_gradio_with_feedback(case_summary, arguments, evidence, recommendations, feedback): |
|
io_dict = {} |
|
recs = [r.strip('-•* ').strip() for r in recommendations.split('\n') if r.strip()] if isinstance(recommendations, str) else recommendations |
|
similar_cases = find_similar_cases(case_summary, arguments, evidence, rag_system.vectorstore) |
|
|
|
return generate_report( |
|
case_summary, |
|
arguments, |
|
evidence, |
|
io_dict, |
|
recs, |
|
similar_cases |
|
) + (f"\n\n[User Feedback: {feedback}]" if feedback else "") |
|
regenerate_report_btn.click( |
|
fn=generate_report_gradio_with_feedback, |
|
inputs=[case_summary_input, arguments_input, evidence_input, recommendations_output, feedback_report], |
|
outputs=[report_output] |
|
) |
|
|
|
def export_txt(report_text): |
|
return export_report_file(report_text, filetype="txt") |
|
def export_pdf(report_text): |
|
try: |
|
import fpdf |
|
except ImportError: |
|
return None |
|
return export_report_file(report_text, filetype="pdf") |
|
|
|
export_txt_btn.click( |
|
fn=export_txt, |
|
inputs=[report_output], |
|
outputs=[download_file] |
|
) |
|
export_pdf_btn.click( |
|
fn=export_pdf, |
|
inputs=[report_output], |
|
outputs=[download_file] |
|
) |
|
|
|
ask_btn.click( |
|
fn=ask_question, |
|
inputs=[question_input], |
|
outputs=[answer_output] |
|
) |
|
question_input.submit( |
|
fn=ask_question, |
|
inputs=[question_input], |
|
outputs=[answer_output] |
|
) |
|
|
|
def refresh_files_list(): |
|
files = list_chroma_files() |
|
if not files: |
|
logger.warning("No files found in ChromaDB after upload or deletion.") |
|
return [["No files found", "", "", "", ""]] |
|
|
|
return [[f['index'], f['filename'], str(f.get('columns', '')), str(f.get('rows', '')), str(f.get('slides', ''))] for f in files] |
|
|
|
files_list.value = refresh_files_list() |
|
|
|
def handle_delete_file(index): |
|
try: |
|
idx = int(index) |
|
except Exception: |
|
return "Invalid index." |
|
success = delete_chroma_file(idx) |
|
files_list.value = refresh_files_list() |
|
return "File deleted." if success else "Failed to delete file." |
|
|
|
delete_btn.click( |
|
fn=handle_delete_file, |
|
inputs=[delete_index], |
|
outputs=[delete_status] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
debug=True |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|