from crewai import Agent, Task, Crew from fastapi import FastAPI, HTTPException from pydantic import BaseModel import requests # Initialize FastAPI app app = FastAPI() # Define agents using CrewAI class ThreatIntelligenceCrew: def __init__(self, api_key): self.api_key = api_key # Define agents self.data_collector = Agent( role="Data Collector", goal="Fetch threat data from external sources", backstory="Specializes in collecting IOCs from threat intelligence feeds.", tools=[self.fetch_iocs] # Custom tool for fetching IOCs ) self.analyst = Agent( role="Threat Analyst", goal="Analyze collected data for suspicious activity", backstory="Expert in identifying patterns and anomalies in threat data.", tools=[self.analyze_iocs] # Custom tool for analyzing IOCs ) self.correlator = Agent( role="Threat Correlator", goal="Correlate IOCs with known threat actors", backstory="Specializes in linking IOCs to advanced threat groups.", tools=[self.correlate_threats] # Custom tool for correlation ) self.reporter = Agent( role="Threat Reporter", goal="Generate actionable threat intelligence reports", backstory="Expert in creating clear and concise reports for security teams.", tools=[self.generate_report] # Custom tool for reporting ) self.responder = Agent( role="Response Advisor", goal="Recommend mitigation actions based on threats", backstory="Specializes in providing actionable recommendations to mitigate risks.", tools=[self.recommend_actions] # Custom tool for response ) # Custom tool: Fetch IOCs from AlienVault OTX def fetch_iocs(self, indicator_type="ipv4"): url = f"https://otx.alienvault.com/api/v1/indicators/{indicator_type}/recent" headers = {"X-OTX-API-KEY": self.api_key} response = requests.get(url, headers=headers) if response.status_code == 200: return response.json() else: return {"error": "Failed to fetch IOCs"} # Custom tool: Analyze IOCs def analyze_iocs(self, iocs): suspicious_iocs = [] for ioc in iocs.get("results", []): if ioc.get("pulse_info", {}).get("count", 0) > 5: # Example threshold suspicious_iocs.append(ioc) return suspicious_iocs # Custom tool: Correlate threats def correlate_threats(self, iocs): threat_actors = { "APT28": ["1.2.3.4", "5.6.7.8"], "Lazarus Group": ["9.10.11.12"] } correlated_threats = {} for ioc in iocs: ip = ioc.get("indicator") for actor, ips in threat_actors.items(): if ip in ips: correlated_threats[ip] = actor return correlated_threats # Custom tool: Generate report def generate_report(self, suspicious_iocs, correlated_threats): report = { "suspicious_iocs": suspicious_iocs, "correlated_threats": correlated_threats, "summary": f"Found {len(suspicious_iocs)} suspicious IOCs, with {len(correlated_threats)} linked to known threat actors." } return report # Custom tool: Recommend actions def recommend_actions(self, correlated_threats): actions = [] for ip, actor in correlated_threats.items(): actions.append(f"Block IP {ip} (linked to {actor})") return actions # Define tasks for the crew def create_tasks(self, indicator_type): fetch_task = Task( description=f"Fetch IOCs of type {indicator_type} from AlienVault OTX", agent=self.data_collector, expected_output="A list of IOCs in JSON format." ) analyze_task = Task( description="Analyze the fetched IOCs for suspicious activity", agent=self.analyst, expected_output="A list of suspicious IOCs." ) correlate_task = Task( description="Correlate suspicious IOCs with known threat actors", agent=self.correlator, expected_output="A dictionary mapping IOCs to threat actors." ) report_task = Task( description="Generate a threat intelligence report", agent=self.reporter, expected_output="A JSON report with suspicious IOCs, correlated threats, and a summary." ) respond_task = Task( description="Recommend mitigation actions based on the report", agent=self.responder, expected_output="A list of recommended actions." ) return [fetch_task, analyze_task, correlate_task, report_task, respond_task] # Execute the crew def run_crew(self, indicator_type): tasks = self.create_tasks(indicator_type) crew = Crew( agents=[self.data_collector, self.analyst, self.correlator, self.reporter, self.responder], tasks=tasks ) return crew.kickoff() # FastAPI endpoint class ThreatIntelRequest(BaseModel): indicator_type: str = "ipv4" @app.post("/threat-intel") def threat_intel(request: ThreatIntelRequest): try: # Initialize the crew crew = ThreatIntelligenceCrew(api_key="your_alienvault_api_key") # Run the crew and get results result = crew.run_crew(request.indicator_type) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Run the FastAPI app if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)