import asyncio import base64 from datetime import datetime, timezone, timedelta import jwt import threading import time from typing import List import requests from config import APP_ID, APP_PRIVATE_KEY installation_tokens = {} token_lock = threading.Lock() def generate_jwt(): """Generate a JWT signed with GitHub App private key.""" now = int(time.time()) payload = { "iat": now, "exp": now + (10 * 60), "iss": APP_ID, } encoded_jwt = jwt.encode(payload, APP_PRIVATE_KEY, algorithm="RS256") return encoded_jwt def github_request(method, url, headers=None, **kwargs): if headers is None: jwt_token = generate_jwt() headers = { "Authorization": f"Bearer {jwt_token}", "Accept": "application/vnd.github.v3+json", } while True: response = requests.request(method, url, headers=headers, **kwargs) remaining = response.headers.get("X-RateLimit-Remaining") reset_time = response.headers.get("X-RateLimit-Reset") if remaining is None or reset_time is None: return response remaining = int(remaining) reset_time = int(reset_time) print(f"[GitHub] Remaining: {remaining}, Reset: {reset_time}") if response.status_code == 403 and "rate limit" in response.text.lower(): wait = reset_time - int(time.time()) + 5 print(f"Hit rate limit. Sleeping for {wait} seconds.") time.sleep(max(wait, 0)) continue if remaining <= 2: wait = reset_time - int(time.time()) + 5 print(f"Approaching rate limit ({remaining} left). Sleeping for {wait} seconds.") time.sleep(max(wait, 0)) continue return response def get_installation_id(owner, repo): """Fetch the installation ID for the app on a repo.""" url = f"https://api.github.com/repos/{owner}/{repo}/installation" response = github_request("GET", url) if response.status_code == 200: data = response.json() return data["id"] else: raise Exception(f"Failed to get installation ID for {owner}/{repo}: {response.status_code}. Please [install the GitHub App](https://github.com/apps/opensorus) on this repository before using the agent.") def get_installation_token(installation_id): """Return a valid installation token, fetch new if expired or missing.""" with token_lock: token_info = installation_tokens.get(installation_id) if token_info and token_info["expires_at"] > datetime.now(timezone.utc) + timedelta(seconds=30): return token_info["token"] url = f"https://api.github.com/app/installations/{installation_id}/access_tokens" response = github_request("POST", url) if response.status_code != 201: raise Exception(f"Failed to fetch installation token: {response.status_code}. Please [install the GitHub App](https://github.com/apps/opensorus) on this repository before using the agent.") token_data = response.json() token = token_data["token"] expires_at = datetime.strptime(token_data["expires_at"], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) installation_tokens[installation_id] = {"token": token, "expires_at": expires_at} return token async def fetch_repo_files(owner: str, repo: str, ref: str = "main") -> List[str]: """ Lists all files in the repository by recursively fetching the Git tree from GitHub API. Returns a list of file paths. """ installation_id = get_installation_id(owner, repo) token = get_installation_token(installation_id) url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{ref}?recursive=1" headers = { "Authorization": f"Bearer {token}", "Accept": "application/vnd.github.v3+json" } response = await asyncio.to_thread(github_request, "GET", url, headers=headers) if response.status_code != 200: raise Exception(f"Failed to fetch repository files: {response.status_code}. Please ensure the branch name is correct and files exist in this branch.") tree = response.json().get("tree", []) file_paths = [item["path"] for item in tree if item["type"] == "blob"] return file_paths async def fetch_file_content(owner: str, repo: str, path: str, ref: str = "main") -> str: """ Fetches the content of a file from the GitHub repository. """ installation_id = get_installation_id(owner, repo) token = await asyncio.to_thread(get_installation_token, installation_id) url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={ref}" headers = { "Authorization": f"Bearer {token}", "Accept": "application/vnd.github.v3+json" } response = await asyncio.to_thread(github_request, "GET", url, headers=headers) if response.status_code != 200: raise Exception(f"Failed to fetch file content {path}: {response.status_code} {response.text}") content_json = response.json() content = base64.b64decode(content_json["content"]).decode("utf-8", errors="ignore") return content