Adilmar's picture
Create app.py
10647a8 verified
import gradio as gr
import requests
import json
import uuid
from dotenv import load_dotenv
import os
import http.server
import socketserver
import functools
import threading
import time
import base64
import xml.etree.ElementTree as ET
from datetime import datetime
load_dotenv()
#carregando variaveis de ambiente
api_key = os.getenv("API_KEY_ZAP")
TOKEN_SENSEDIA = os.getenv("TOKEN_SENSEDIA")
FARADAY_TOKEN = os.getenv("FARADAY_TOKEN")
POWER_TOKEN = os.getenv("POWER_TOKEN")
HOST_ZAP = os.getenv("HOST_ZAP")
HOST_FARADAY = os.getenv("HOST_FARADAY")
AI_API_URL = os.getenv("AI_API_URL")
IA_TOKEN = os.getenv("IA_TOKEN")
FARADAY_API_TOKEN = os.getenv("FARADAY_API_TOKEN")
API_TOKEN_HF = os.getenv("API_TOKEN_HF")
cabecario = '''
███████╗ █████╗ ██████╗ █████╗ ██████╗ █████╗ ██╗ ██╗ ██╗ █████╗
██╔════╝██╔══██╗██╔══██╗██╔══██╗██╔══██╗██╔══██╗╚██╗ ██╔╝ ██║██╔══██╗
█████╗ ███████║██████╔╝███████║██║ ██║███████║ ╚████╔╝ ██║███████║
██╔══╝ ██╔══██║██╔══██╗██╔══██║██║ ██║██╔══██║ ╚██╔╝ ██║██╔══██║
██║ ██║ ██║██║ ██║██║ ██║██████╔╝██║ ██║ ██║ ██║██║ ██║
╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝
'''
print(cabecario)
PORT = 8080
DIRECTORY = "dash"
Handler = functools.partial(http.server.SimpleHTTPRequestHandler, directory=DIRECTORY)
def get_xml_text(element, tag, default=''):
"""Helper function to safely get text from XML elements"""
found = element.find(tag)
return found.text if found is not None else default
def start_server():
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print(f"🚀 Servidor rodando em http://localhost:{PORT}/")
httpd.serve_forever()
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()
print("🔧 Configurando ambiente...")
def new_session():
session_id = str(uuid.uuid4())
headers = {
"Authorization": API_TOKEN_HF
}
new_session = f"{HOST_ZAP}/JSON/core/action/newSession/?apikey={api_key}&name={session_id}&overwrite=true"
response_session = requests.get(new_session, headers=headers, verify=False)
print(response_session)
return response_session
title_html = f'<h1 style="display: flex; align-items: center;">' \
f' <img src="https://i.imgur.com/XKc295l.png" alt="Logo" style="height: 40px; margin-right: 10px;" />' \
f' ⚡ Michael Faraday - AI Security⚡' \
f'</h1>'
def upload_swagger(file, target_url, base_url):
new_session()
headers = {
"Authorization": API_TOKEN_HF
}
url = f"{HOST_ZAP}/OTHER/core/other/fileUpload/"
file_name = str(uuid.uuid4())
file_name = f"{file_name}.json"
if file is None:
return "Erro: Nenhum arquivo selecionado."
with open(file.name, "rb") as f:
files = {'fileContents': (file_name, f, 'application/json')}
data = {'apikey': api_key, 'fileName': file_name}
response = requests.post(url, files=files, headers=headers, data=data)
try:
response_json = response.json()
uploaded_path = response_json.get("Uploaded", "")
if not uploaded_path:
return "Erro: Caminho do arquivo não encontrado na resposta."
except json.JSONDecodeError:
return "Erro ao processar resposta do upload."
# Segunda requisição - Importar arquivo
import_url = f"{HOST_ZAP}/JSON/openapi/action/importFile/?apikey={api_key}&file={uploaded_path}&target={target_url}&contextId=&userId="
response_import = requests.get(import_url, headers=headers, verify=False)
# Terceira requisição - Definir baseUrl
base_url_encoded = requests.utils.quote(base_url, safe='')
set_global_var_url = f"{HOST_ZAP}/JSON/script/action/setGlobalVar/?apikey={api_key}&varKey=baseUrl&varValue={base_url_encoded}"
response_base_url = requests.get(set_global_var_url, headers=headers, verify=False)
# Quarta requisição - Executar script OWASP
run_script_url = f"{HOST_ZAP}/JSON/script/action/runStandAloneScript/?apikey={api_key}&scriptName=owasp.js"
response_script = requests.get(run_script_url, headers=headers, verify=False)
# Quinta requisição - Gerar relatório
report_url = f'{HOST_ZAP}/JSON/reports/action/generate/?apikey={api_key}&title=raizen&template=traditional-xml&sites={target_url}&reportDir=%2Fhome%2Fzap%2F.ZAP'
report_response = requests.get(report_url, headers=headers, verify=False)
try:
report_json = report_response.json()
report_path = report_json.get("generate", "")
if not report_path:
return "Erro ao gerar relatório."
except json.JSONDecodeError:
return "Erro ao processar resposta do relatório."
# Sexta requisição - Resumos das vulnerabilidades
sum_url = f'{HOST_ZAP}/JSON/alert/view/alertsSummary/?apikey={api_key}&baseurl='
sum_response = requests.get(sum_url, headers=headers, verify=False).json()
total_high = sum_response["alertsSummary"]["High"]
total_low = sum_response["alertsSummary"]["Low"]
total_medium = sum_response["alertsSummary"]["Medium"]
resume_vulns = f"Total de vulnerabilidades: Graves: {total_high}, Medias: {total_medium} e Baixas: {total_low}"
#print(resume_vulns)
# 1 - AGENTE DE IA
ZAP_API_URL = f"{HOST_ZAP}/JSON/alert/view/alerts/"
params = {
"apikey": api_key,
"baseurl": "",
"start": "",
"count": "",
"riskId": "",
"contextName": ""
}
response = requests.get(ZAP_API_URL, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
# 2. Filtrar apenas risk e description e remover descrições iguais
alerts = data.get("alerts", [])
# Usando um set para garantir que as descrições sejam únicas
unique_alerts = set() # Armazenar descrições únicas
for alert in alerts:
risk = alert.get('risk', 'Desconhecido')
description = alert.get('description', 'Sem descrição')
# Adiciona a descrição ao set (o set garante unicidade)
unique_alerts.add(f"Risco: {risk} - {description}")
# Convertendo o set de volta para lista, para exibição
security_reports = list(unique_alerts)
if security_reports:
report_content = "\n".join(security_reports)
report_content = report_content + "\n"+resume_vulns
else:
report_content = "Nenhum alerta de segurança encontrado."
# 3. Enviar para agente de IA
payload = {
"messages": [
{
"role": "system",
"content": "Sempre responda em português. Você é um agente de IA chamado Michael Faraday, você recebe reports de segurança e os resume e detalha de forma clara e objetiva com base no OWASP e pode até propor soluções. Use emoji onde der para realçar a resposta."
},
{
"role": "user",
"content": report_content
}
],
"token_api": IA_TOKEN
}
headers2 = {"Content-Type": "application/json"}
print("Enviando para a IA....")
ai_response = requests.post(AI_API_URL, data=json.dumps(payload), headers=headers2)
if ai_response.status_code == 200:
ai_result = ai_response.json()
ai_message = ai_result["response"]["response"]
else:
ai_message = "Erro ao conectar com o agente de IA."
# 4. Retornar mensagem para exibição no chat
markdown_message = f"### Resumo do Agente de IA\n\n{ai_message}"
else:
markdown_message = "Erro ao obter alertas de segurança da API ZAP."
# Sexta requisição - Download do relatório
report_file_name = report_path.split("/")[-1]
download_url = f"{HOST_ZAP}/OTHER/core/other/fileDownload/?apikey={api_key}&fileName={report_file_name}"
report_download_response = requests.get(download_url, headers=headers, verify=False)
if report_download_response.status_code == 200:
# Salvar o relatório em um arquivo temporário
import tempfile
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xml")
temp_file.write(report_download_response.content)
temp_file.close()
return [markdown_message, temp_file.name]
else:
return [markdown_message + "\n\n❌ Erro ao baixar o relatório.", None]
image_url = "https://i.imgur.com/XKc295l.png"
title_html = f'<h1 style="display: flex; align-items: center;">' \
f' <img src="https://i.imgur.com/XKc295l.png" alt="Logo" style="height: 40px; margin-right: 10px;" />' \
f' ⚡ Michael Faraday - AI Security⚡' \
f'</h1>'
# Atualize a interface para lidar com múltiplos retornos
with gr.Blocks(theme='ParityError/Interstellar') as demo:
gr.Markdown("# ⚡ Michael Faraday - AI Security 💫")
gr.Markdown("""
👋 Olá, sou **Michael Faraday**, seu agente de IA especializado em segurança de aplicações!
Envie um arquivo Swagger e informe os campos solicitados. Eu irei analisar, detectar vulnerabilidades e sugerir melhorias.
""")
with gr.Row():
file_input = gr.File(label="Selecione o arquivo Swagger")
target_url = gr.Textbox(label="🔗 Target URL")
base_url = gr.Textbox(label="✨ Base URL")
submit_btn = gr.Button("🧙🏻 Analisar")
with gr.Column():
markdown_output = gr.Markdown(label="Resposta do Agente de IA")
file_output = gr.File(label="Relatório de Segurança", visible=True)
submit_btn.click(
fn=upload_swagger,
inputs=[file_input, target_url, base_url],
outputs=[markdown_output, file_output]
)
print(f"🚀 Agente rodando em http://127.0.0.1:7860")
demo.launch(share=False)