import gradio as gr import requests import json import uuid from dotenv import load_dotenv import os import http.server import socketserver import functools import threading import time import base64 import xml.etree.ElementTree as ET from datetime import datetime load_dotenv() #carregando variaveis de ambiente api_key = os.getenv("API_KEY_ZAP") TOKEN_SENSEDIA = os.getenv("TOKEN_SENSEDIA") FARADAY_TOKEN = os.getenv("FARADAY_TOKEN") POWER_TOKEN = os.getenv("POWER_TOKEN") HOST_ZAP = os.getenv("HOST_ZAP") HOST_FARADAY = os.getenv("HOST_FARADAY") AI_API_URL = os.getenv("AI_API_URL") IA_TOKEN = os.getenv("IA_TOKEN") FARADAY_API_TOKEN = os.getenv("FARADAY_API_TOKEN") API_TOKEN_HF = os.getenv("API_TOKEN_HF") cabecario = ''' ███████╗ █████╗ ██████╗ █████╗ ██████╗ █████╗ ██╗ ██╗ ██╗ █████╗ ██╔════╝██╔══██╗██╔══██╗██╔══██╗██╔══██╗██╔══██╗╚██╗ ██╔╝ ██║██╔══██╗ █████╗ ███████║██████╔╝███████║██║ ██║███████║ ╚████╔╝ ██║███████║ ██╔══╝ ██╔══██║██╔══██╗██╔══██║██║ ██║██╔══██║ ╚██╔╝ ██║██╔══██║ ██║ ██║ ██║██║ ██║██║ ██║██████╔╝██║ ██║ ██║ ██║██║ ██║ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ''' print(cabecario) PORT = 8080 DIRECTORY = "dash" Handler = functools.partial(http.server.SimpleHTTPRequestHandler, directory=DIRECTORY) def get_xml_text(element, tag, default=''): """Helper function to safely get text from XML elements""" found = element.find(tag) return found.text if found is not None else default def start_server(): with socketserver.TCPServer(("", PORT), Handler) as httpd: print(f"🚀 Servidor rodando em http://localhost:{PORT}/") httpd.serve_forever() server_thread = threading.Thread(target=start_server, daemon=True) server_thread.start() print("🔧 Configurando ambiente...") def new_session(): session_id = str(uuid.uuid4()) headers = { "Authorization": API_TOKEN_HF } new_session = f"{HOST_ZAP}/JSON/core/action/newSession/?apikey={api_key}&name={session_id}&overwrite=true" response_session = requests.get(new_session, headers=headers, verify=False) print(response_session) return response_session title_html = f'

' \ f' Logo' \ f' ⚡ Michael Faraday - AI Security⚡' \ f'

' def upload_swagger(file, target_url, base_url): new_session() headers = { "Authorization": API_TOKEN_HF } url = f"{HOST_ZAP}/OTHER/core/other/fileUpload/" file_name = str(uuid.uuid4()) file_name = f"{file_name}.json" if file is None: return "Erro: Nenhum arquivo selecionado." with open(file.name, "rb") as f: files = {'fileContents': (file_name, f, 'application/json')} data = {'apikey': api_key, 'fileName': file_name} response = requests.post(url, files=files, headers=headers, data=data) try: response_json = response.json() uploaded_path = response_json.get("Uploaded", "") if not uploaded_path: return "Erro: Caminho do arquivo não encontrado na resposta." except json.JSONDecodeError: return "Erro ao processar resposta do upload." # Segunda requisição - Importar arquivo import_url = f"{HOST_ZAP}/JSON/openapi/action/importFile/?apikey={api_key}&file={uploaded_path}&target={target_url}&contextId=&userId=" response_import = requests.get(import_url, headers=headers, verify=False) # Terceira requisição - Definir baseUrl base_url_encoded = requests.utils.quote(base_url, safe='') set_global_var_url = f"{HOST_ZAP}/JSON/script/action/setGlobalVar/?apikey={api_key}&varKey=baseUrl&varValue={base_url_encoded}" response_base_url = requests.get(set_global_var_url, headers=headers, verify=False) # Quarta requisição - Executar script OWASP run_script_url = f"{HOST_ZAP}/JSON/script/action/runStandAloneScript/?apikey={api_key}&scriptName=owasp.js" response_script = requests.get(run_script_url, headers=headers, verify=False) # Quinta requisição - Gerar relatório report_url = f'{HOST_ZAP}/JSON/reports/action/generate/?apikey={api_key}&title=raizen&template=traditional-xml&sites={target_url}&reportDir=%2Fhome%2Fzap%2F.ZAP' report_response = requests.get(report_url, headers=headers, verify=False) try: report_json = report_response.json() report_path = report_json.get("generate", "") if not report_path: return "Erro ao gerar relatório." except json.JSONDecodeError: return "Erro ao processar resposta do relatório." # Sexta requisição - Resumos das vulnerabilidades sum_url = f'{HOST_ZAP}/JSON/alert/view/alertsSummary/?apikey={api_key}&baseurl=' sum_response = requests.get(sum_url, headers=headers, verify=False).json() total_high = sum_response["alertsSummary"]["High"] total_low = sum_response["alertsSummary"]["Low"] total_medium = sum_response["alertsSummary"]["Medium"] resume_vulns = f"Total de vulnerabilidades: Graves: {total_high}, Medias: {total_medium} e Baixas: {total_low}" #print(resume_vulns) # 1 - AGENTE DE IA ZAP_API_URL = f"{HOST_ZAP}/JSON/alert/view/alerts/" params = { "apikey": api_key, "baseurl": "", "start": "", "count": "", "riskId": "", "contextName": "" } response = requests.get(ZAP_API_URL, headers=headers, params=params) if response.status_code == 200: data = response.json() # 2. Filtrar apenas risk e description e remover descrições iguais alerts = data.get("alerts", []) # Usando um set para garantir que as descrições sejam únicas unique_alerts = set() # Armazenar descrições únicas for alert in alerts: risk = alert.get('risk', 'Desconhecido') description = alert.get('description', 'Sem descrição') # Adiciona a descrição ao set (o set garante unicidade) unique_alerts.add(f"Risco: {risk} - {description}") # Convertendo o set de volta para lista, para exibição security_reports = list(unique_alerts) if security_reports: report_content = "\n".join(security_reports) report_content = report_content + "\n"+resume_vulns else: report_content = "Nenhum alerta de segurança encontrado." # 3. Enviar para agente de IA payload = { "messages": [ { "role": "system", "content": "Sempre responda em português. Você é um agente de IA chamado Michael Faraday, você recebe reports de segurança e os resume e detalha de forma clara e objetiva com base no OWASP e pode até propor soluções. Use emoji onde der para realçar a resposta." }, { "role": "user", "content": report_content } ], "token_api": IA_TOKEN } headers2 = {"Content-Type": "application/json"} print("Enviando para a IA....") ai_response = requests.post(AI_API_URL, data=json.dumps(payload), headers=headers2) if ai_response.status_code == 200: ai_result = ai_response.json() ai_message = ai_result["response"]["response"] else: ai_message = "Erro ao conectar com o agente de IA." # 4. Retornar mensagem para exibição no chat markdown_message = f"### Resumo do Agente de IA\n\n{ai_message}" else: markdown_message = "Erro ao obter alertas de segurança da API ZAP." # Sexta requisição - Download do relatório report_file_name = report_path.split("/")[-1] download_url = f"{HOST_ZAP}/OTHER/core/other/fileDownload/?apikey={api_key}&fileName={report_file_name}" report_download_response = requests.get(download_url, headers=headers, verify=False) if report_download_response.status_code == 200: # Salvar o relatório em um arquivo temporário import tempfile temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xml") temp_file.write(report_download_response.content) temp_file.close() return [markdown_message, temp_file.name] else: return [markdown_message + "\n\n❌ Erro ao baixar o relatório.", None] image_url = "https://i.imgur.com/XKc295l.png" title_html = f'

' \ f' Logo' \ f' ⚡ Michael Faraday - AI Security⚡' \ f'

' # Atualize a interface para lidar com múltiplos retornos with gr.Blocks(theme='ParityError/Interstellar') as demo: gr.Markdown("# ⚡ Michael Faraday - AI Security 💫") gr.Markdown(""" 👋 Olá, sou **Michael Faraday**, seu agente de IA especializado em segurança de aplicações! Envie um arquivo Swagger e informe os campos solicitados. Eu irei analisar, detectar vulnerabilidades e sugerir melhorias. """) with gr.Row(): file_input = gr.File(label="Selecione o arquivo Swagger") target_url = gr.Textbox(label="🔗 Target URL") base_url = gr.Textbox(label="✨ Base URL") submit_btn = gr.Button("🧙🏻 Analisar") with gr.Column(): markdown_output = gr.Markdown(label="Resposta do Agente de IA") file_output = gr.File(label="Relatório de Segurança", visible=True) submit_btn.click( fn=upload_swagger, inputs=[file_input, target_url, base_url], outputs=[markdown_output, file_output] ) print(f"🚀 Agente rodando em http://127.0.0.1:7860") demo.launch(share=False)