Spaces:
Runtime error
Runtime error
File size: 3,849 Bytes
d129093 cbf8e75 d129093 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
from fastapi import FastAPI
from pydantic import BaseModel
import requests
import os
from datetime import datetime, timedelta
from groq import Groq
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
GITHUB_TOKEN = "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"
GROQ_API_KEY = "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"
REPOSITORIES = [
"falcosecurity/rules",
"SigmaHQ/sigma",
"reversinglabs/reversinglabs-yara-rules",
"elastic/detection-rules",
"sublime-security/sublime-rules",
"Yamato-Security/hayabusa-rules",
"anvilogic-forge/armory",
"chainguard-dev/osquery-defense-kit",
"splunk/security_content",
"Neo23x0/signature-base",
"SlimKQL/Hunting-Queries-Detection-Rules"
]
DAYS_BACK = 7
# GitHub API base URL
GITHUB_API_URL = "https://api.github.com"
# Groq client setup
groq_client = Groq(api_key=GROQ_API_KEY)
# FastAPI app
app = FastAPI(docs_url=None, redoc_url=None)
class RepositoryDetails(BaseModel):
repo_name: str
repo_url: str
changes: str
description: str
context: str
def fetch_repository_changes(repo: str, days_back: int) -> list[str]:
since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json"
}
commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits"
commits_params = {"since": since_date}
commits_response = requests.get(commits_url, headers=headers, params=commits_params)
commits = commits_response.json()
prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls"
prs_params = {"state": "all", "sort": "updated", "direction": "desc"}
prs_response = requests.get(prs_url, headers=headers, params=prs_params)
prs = prs_response.json()
changes = []
for commit in commits:
changes.append(f"Commit: {commit['commit']['message']}")
for pr in prs:
changes.append(f"PR: {pr['title']} - {pr['body']}")
return changes
def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict:
prompt = f"""
The following changes were made to detection rules in the GitHub repository {repo}.
Provide a detailed description of the changes and explain the context of why these changes are required:
{changes}
"""
response = groq_client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
temperature=0.7
)
summary = response.choices[0].message.content
description = summary.split("Description:")[1].split("Context:")[0].strip()
context = summary.split("Context:")[1].strip()
return {
"description": description,
"context": context
}
@app.get("/monitor", response_model=list[RepositoryDetails])
async def monitor_repositories():
results = []
for repo in REPOSITORIES:
changes = fetch_repository_changes(repo, DAYS_BACK)
if changes:
summary = summarize_changes_with_deepseek(repo, changes)
results.append(RepositoryDetails(
repo_name=f"{repo} (+{len(changes)}, ✎{len(changes)})",
repo_url=f"https://github.com/{repo}",
changes="\n".join(changes),
description=summary["description"],
context=summary["context"]
))
else:
results.append(RepositoryDetails(
repo_name=f"{repo} (No changes)",
repo_url=f"https://github.com/{repo}",
changes="No changes detected in the last 7 days.",
description="No changes detected.",
context="No context available."
))
return results |