from langchain.chains import RetrievalQA from langchain_community.llms import HuggingFacePipeline from transformers import pipeline from modules import parser, vectorizer from datetime import datetime import re def filter_logs_by_time(logs_text, start_time, end_time): """ Filters log lines based on timestamp range. """ if not start_time or not end_time: return logs_text # Skip filtering if not both are set start = datetime.fromisoformat(str(start_time)) end = datetime.fromisoformat(str(end_time)) filtered_lines = [] for line in logs_text.splitlines(): match = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", line) if match: timestamp = datetime.strptime(match.group(1), "%Y-%m-%d %H:%M:%S") if start <= timestamp <= end: filtered_lines.append(line) return "\n".join(filtered_lines) def run_analysis(uploaded_files, text_input, query, quick_action, temperature, start_time, end_time): logs_text = "" # Combine uploaded + pasted logs if uploaded_files: logs_text += parser.parse_uploaded_files(uploaded_files) if text_input: logs_text += "\n" + text_input if not logs_text.strip(): return "❌ No logs provided.", None, None, None # Filter logs based on time range (if provided) logs_text = filter_logs_by_time(logs_text, start_time, end_time) # Use either typed query or quick action query_text = query.strip() if query else "" if not query_text and quick_action: query_text = quick_action if not query_text: return "❌ No query or quick action selected.", None, None, None # Process logs docs = vectorizer.prepare_documents(logs_text) vectordb = vectorizer.create_vectorstore(docs) pipe = pipeline("text-generation", model="gpt2", max_length=512, temperature=temperature) llm = HuggingFacePipeline(pipeline=pipe) qa = RetrievalQA.from_chain_type(llm=llm, retriever=vectordb.as_retriever()) result = qa.run(query_text) # Dummy charts and alerts for testing bar_data = {"Hour": ["14:00", "15:00"], "Count": [8, 4]} pie_data = {"Event Type": ["Blocked", "Scan"], "Count": [8, 4]} alerts = [("CRITICAL", "8 blocked SSH attempts from 192.168.1.5"), ("WARNING", "4 port scanning alerts from 10.0.0.8")] return result, bar_data, pie_data, alerts