Spaces:
Runtime error
Runtime error
from fastapi import FastAPI | |
from pydantic import BaseModel | |
import requests | |
import os | |
from datetime import datetime, timedelta | |
from groq import Groq | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
# Configuration | |
GITHUB_TOKEN = "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" | |
GROQ_API_KEY = "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" | |
REPOSITORIES = [ | |
"falcosecurity/rules", | |
"SigmaHQ/sigma", | |
"reversinglabs/reversinglabs-yara-rules", | |
"elastic/detection-rules", | |
"sublime-security/sublime-rules", | |
"Yamato-Security/hayabusa-rules", | |
"anvilogic-forge/armory", | |
"chainguard-dev/osquery-defense-kit", | |
"splunk/security_content", | |
"Neo23x0/signature-base", | |
"SlimKQL/Hunting-Queries-Detection-Rules" | |
] | |
DAYS_BACK = 7 | |
# GitHub API base URL | |
GITHUB_API_URL = "https://api.github.com" | |
# Groq client setup | |
groq_client = Groq(api_key=GROQ_API_KEY) | |
# FastAPI app | |
app = FastAPI(docs_url=None, redoc_url=None) | |
class RepositoryDetails(BaseModel): | |
repo_name: str | |
repo_url: str | |
changes: str | |
description: str | |
context: str | |
def fetch_repository_changes(repo: str, days_back: int) -> list[str]: | |
since_date = (datetime.now() - timedelta(days=days_back)).isoformat() | |
headers = { | |
"Authorization": f"token {GITHUB_TOKEN}", | |
"Accept": "application/vnd.github.v3+json" | |
} | |
commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits" | |
commits_params = {"since": since_date} | |
commits_response = requests.get(commits_url, headers=headers, params=commits_params) | |
commits = commits_response.json() | |
prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls" | |
prs_params = {"state": "all", "sort": "updated", "direction": "desc"} | |
prs_response = requests.get(prs_url, headers=headers, params=prs_params) | |
prs = prs_response.json() | |
changes = [] | |
for commit in commits: | |
changes.append(f"Commit: {commit['commit']['message']}") | |
for pr in prs: | |
changes.append(f"PR: {pr['title']} - {pr['body']}") | |
return changes | |
def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict: | |
prompt = f""" | |
The following changes were made to detection rules in the GitHub repository {repo}. | |
Provide a detailed description of the changes and explain the context of why these changes are required: | |
{changes} | |
""" | |
response = groq_client.chat.completions.create( | |
model="deepseek-chat", | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=500, | |
temperature=0.7 | |
) | |
summary = response.choices[0].message.content | |
description = summary.split("Description:")[1].split("Context:")[0].strip() | |
context = summary.split("Context:")[1].strip() | |
return { | |
"description": description, | |
"context": context | |
} | |
async def monitor_repositories(): | |
results = [] | |
for repo in REPOSITORIES: | |
changes = fetch_repository_changes(repo, DAYS_BACK) | |
if changes: | |
summary = summarize_changes_with_deepseek(repo, changes) | |
results.append(RepositoryDetails( | |
repo_name=f"{repo} (+{len(changes)}, β{len(changes)})", | |
repo_url=f"https://github.com/{repo}", | |
changes="\n".join(changes), | |
description=summary["description"], | |
context=summary["context"] | |
)) | |
else: | |
results.append(RepositoryDetails( | |
repo_name=f"{repo} (No changes)", | |
repo_url=f"https://github.com/{repo}", | |
changes="No changes detected in the last 7 days.", | |
description="No changes detected.", | |
context="No context available." | |
)) | |
return results |