Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
import json | |
import uuid | |
from dotenv import load_dotenv | |
import os | |
import http.server | |
import socketserver | |
import functools | |
import threading | |
import time | |
import base64 | |
import xml.etree.ElementTree as ET | |
from datetime import datetime | |
load_dotenv() | |
#carregando variaveis de ambiente | |
api_key = os.getenv("API_KEY_ZAP") | |
TOKEN_SENSEDIA = os.getenv("TOKEN_SENSEDIA") | |
FARADAY_TOKEN = os.getenv("FARADAY_TOKEN") | |
POWER_TOKEN = os.getenv("POWER_TOKEN") | |
HOST_ZAP = os.getenv("HOST_ZAP") | |
HOST_FARADAY = os.getenv("HOST_FARADAY") | |
AI_API_URL = os.getenv("AI_API_URL") | |
IA_TOKEN = os.getenv("IA_TOKEN") | |
FARADAY_API_TOKEN = os.getenv("FARADAY_API_TOKEN") | |
API_TOKEN_HF = os.getenv("API_TOKEN_HF") | |
cabecario = ''' | |
███████╗ █████╗ ██████╗ █████╗ ██████╗ █████╗ ██╗ ██╗ ██╗ █████╗ | |
██╔════╝██╔══██╗██╔══██╗██╔══██╗██╔══██╗██╔══██╗╚██╗ ██╔╝ ██║██╔══██╗ | |
█████╗ ███████║██████╔╝███████║██║ ██║███████║ ╚████╔╝ ██║███████║ | |
██╔══╝ ██╔══██║██╔══██╗██╔══██║██║ ██║██╔══██║ ╚██╔╝ ██║██╔══██║ | |
██║ ██║ ██║██║ ██║██║ ██║██████╔╝██║ ██║ ██║ ██║██║ ██║ | |
╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝ | |
''' | |
print(cabecario) | |
PORT = 8080 | |
DIRECTORY = "dash" | |
Handler = functools.partial(http.server.SimpleHTTPRequestHandler, directory=DIRECTORY) | |
def get_xml_text(element, tag, default=''): | |
"""Helper function to safely get text from XML elements""" | |
found = element.find(tag) | |
return found.text if found is not None else default | |
def start_server(): | |
with socketserver.TCPServer(("", PORT), Handler) as httpd: | |
print(f"🚀 Servidor rodando em http://localhost:{PORT}/") | |
httpd.serve_forever() | |
server_thread = threading.Thread(target=start_server, daemon=True) | |
server_thread.start() | |
print("🔧 Configurando ambiente...") | |
def new_session(): | |
session_id = str(uuid.uuid4()) | |
headers = { | |
"Authorization": API_TOKEN_HF | |
} | |
new_session = f"{HOST_ZAP}/JSON/core/action/newSession/?apikey={api_key}&name={session_id}&overwrite=true" | |
response_session = requests.get(new_session, headers=headers, verify=False) | |
print(response_session) | |
return response_session | |
title_html = f'<h1 style="display: flex; align-items: center;">' \ | |
f' <img src="https://i.imgur.com/XKc295l.png" alt="Logo" style="height: 40px; margin-right: 10px;" />' \ | |
f' ⚡ Michael Faraday - AI Security⚡' \ | |
f'</h1>' | |
def upload_swagger(file, target_url, base_url): | |
new_session() | |
headers = { | |
"Authorization": API_TOKEN_HF | |
} | |
url = f"{HOST_ZAP}/OTHER/core/other/fileUpload/" | |
file_name = str(uuid.uuid4()) | |
file_name = f"{file_name}.json" | |
if file is None: | |
return "Erro: Nenhum arquivo selecionado." | |
with open(file.name, "rb") as f: | |
files = {'fileContents': (file_name, f, 'application/json')} | |
data = {'apikey': api_key, 'fileName': file_name} | |
response = requests.post(url, files=files, headers=headers, data=data) | |
try: | |
response_json = response.json() | |
uploaded_path = response_json.get("Uploaded", "") | |
if not uploaded_path: | |
return "Erro: Caminho do arquivo não encontrado na resposta." | |
except json.JSONDecodeError: | |
return "Erro ao processar resposta do upload." | |
# Segunda requisição - Importar arquivo | |
import_url = f"{HOST_ZAP}/JSON/openapi/action/importFile/?apikey={api_key}&file={uploaded_path}&target={target_url}&contextId=&userId=" | |
response_import = requests.get(import_url, headers=headers, verify=False) | |
# Terceira requisição - Definir baseUrl | |
base_url_encoded = requests.utils.quote(base_url, safe='') | |
set_global_var_url = f"{HOST_ZAP}/JSON/script/action/setGlobalVar/?apikey={api_key}&varKey=baseUrl&varValue={base_url_encoded}" | |
response_base_url = requests.get(set_global_var_url, headers=headers, verify=False) | |
# Quarta requisição - Executar script OWASP | |
run_script_url = f"{HOST_ZAP}/JSON/script/action/runStandAloneScript/?apikey={api_key}&scriptName=owasp.js" | |
response_script = requests.get(run_script_url, headers=headers, verify=False) | |
# Quinta requisição - Gerar relatório | |
report_url = f'{HOST_ZAP}/JSON/reports/action/generate/?apikey={api_key}&title=raizen&template=traditional-xml&sites={target_url}&reportDir=%2Fhome%2Fzap%2F.ZAP' | |
report_response = requests.get(report_url, headers=headers, verify=False) | |
try: | |
report_json = report_response.json() | |
report_path = report_json.get("generate", "") | |
if not report_path: | |
return "Erro ao gerar relatório." | |
except json.JSONDecodeError: | |
return "Erro ao processar resposta do relatório." | |
# Sexta requisição - Resumos das vulnerabilidades | |
sum_url = f'{HOST_ZAP}/JSON/alert/view/alertsSummary/?apikey={api_key}&baseurl=' | |
sum_response = requests.get(sum_url, headers=headers, verify=False).json() | |
total_high = sum_response["alertsSummary"]["High"] | |
total_low = sum_response["alertsSummary"]["Low"] | |
total_medium = sum_response["alertsSummary"]["Medium"] | |
resume_vulns = f"Total de vulnerabilidades: Graves: {total_high}, Medias: {total_medium} e Baixas: {total_low}" | |
#print(resume_vulns) | |
# 1 - AGENTE DE IA | |
ZAP_API_URL = f"{HOST_ZAP}/JSON/alert/view/alerts/" | |
params = { | |
"apikey": api_key, | |
"baseurl": "", | |
"start": "", | |
"count": "", | |
"riskId": "", | |
"contextName": "" | |
} | |
response = requests.get(ZAP_API_URL, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
# 2. Filtrar apenas risk e description e remover descrições iguais | |
alerts = data.get("alerts", []) | |
# Usando um set para garantir que as descrições sejam únicas | |
unique_alerts = set() # Armazenar descrições únicas | |
for alert in alerts: | |
risk = alert.get('risk', 'Desconhecido') | |
description = alert.get('description', 'Sem descrição') | |
# Adiciona a descrição ao set (o set garante unicidade) | |
unique_alerts.add(f"Risco: {risk} - {description}") | |
# Convertendo o set de volta para lista, para exibição | |
security_reports = list(unique_alerts) | |
if security_reports: | |
report_content = "\n".join(security_reports) | |
report_content = report_content + "\n"+resume_vulns | |
else: | |
report_content = "Nenhum alerta de segurança encontrado." | |
# 3. Enviar para agente de IA | |
payload = { | |
"messages": [ | |
{ | |
"role": "system", | |
"content": "Sempre responda em português. Você é um agente de IA chamado Michael Faraday, você recebe reports de segurança e os resume e detalha de forma clara e objetiva com base no OWASP e pode até propor soluções. Use emoji onde der para realçar a resposta." | |
}, | |
{ | |
"role": "user", | |
"content": report_content | |
} | |
], | |
"token_api": IA_TOKEN | |
} | |
headers2 = {"Content-Type": "application/json"} | |
print("Enviando para a IA....") | |
ai_response = requests.post(AI_API_URL, data=json.dumps(payload), headers=headers2) | |
if ai_response.status_code == 200: | |
ai_result = ai_response.json() | |
ai_message = ai_result["response"]["response"] | |
else: | |
ai_message = "Erro ao conectar com o agente de IA." | |
# 4. Retornar mensagem para exibição no chat | |
markdown_message = f"### Resumo do Agente de IA\n\n{ai_message}" | |
else: | |
markdown_message = "Erro ao obter alertas de segurança da API ZAP." | |
# Sexta requisição - Download do relatório | |
report_file_name = report_path.split("/")[-1] | |
download_url = f"{HOST_ZAP}/OTHER/core/other/fileDownload/?apikey={api_key}&fileName={report_file_name}" | |
report_download_response = requests.get(download_url, headers=headers, verify=False) | |
if report_download_response.status_code == 200: | |
# Salvar o relatório em um arquivo temporário | |
import tempfile | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xml") | |
temp_file.write(report_download_response.content) | |
temp_file.close() | |
return [markdown_message, temp_file.name] | |
else: | |
return [markdown_message + "\n\n❌ Erro ao baixar o relatório.", None] | |
image_url = "https://i.imgur.com/XKc295l.png" | |
title_html = f'<h1 style="display: flex; align-items: center;">' \ | |
f' <img src="https://i.imgur.com/XKc295l.png" alt="Logo" style="height: 40px; margin-right: 10px;" />' \ | |
f' ⚡ Michael Faraday - AI Security⚡' \ | |
f'</h1>' | |
# Atualize a interface para lidar com múltiplos retornos | |
with gr.Blocks(theme='ParityError/Interstellar') as demo: | |
gr.Markdown("# ⚡ Michael Faraday - AI Security 💫") | |
gr.Markdown(""" | |
👋 Olá, sou **Michael Faraday**, seu agente de IA especializado em segurança de aplicações! | |
Envie um arquivo Swagger e informe os campos solicitados. Eu irei analisar, detectar vulnerabilidades e sugerir melhorias. | |
""") | |
with gr.Row(): | |
file_input = gr.File(label="Selecione o arquivo Swagger") | |
target_url = gr.Textbox(label="🔗 Target URL") | |
base_url = gr.Textbox(label="✨ Base URL") | |
submit_btn = gr.Button("🧙🏻 Analisar") | |
with gr.Column(): | |
markdown_output = gr.Markdown(label="Resposta do Agente de IA") | |
file_output = gr.File(label="Relatório de Segurança", visible=True) | |
submit_btn.click( | |
fn=upload_swagger, | |
inputs=[file_input, target_url, base_url], | |
outputs=[markdown_output, file_output] | |
) | |
print(f"🚀 Agente rodando em http://127.0.0.1:7860") | |
demo.launch(share=False) |