Spaces:
Sleeping
Sleeping
File size: 10,772 Bytes
10647a8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
import gradio as gr
import requests
import json
import uuid
from dotenv import load_dotenv
import os
import http.server
import socketserver
import functools
import threading
import time
import base64
import xml.etree.ElementTree as ET
from datetime import datetime
load_dotenv()
#carregando variaveis de ambiente
api_key = os.getenv("API_KEY_ZAP")
TOKEN_SENSEDIA = os.getenv("TOKEN_SENSEDIA")
FARADAY_TOKEN = os.getenv("FARADAY_TOKEN")
POWER_TOKEN = os.getenv("POWER_TOKEN")
HOST_ZAP = os.getenv("HOST_ZAP")
HOST_FARADAY = os.getenv("HOST_FARADAY")
AI_API_URL = os.getenv("AI_API_URL")
IA_TOKEN = os.getenv("IA_TOKEN")
FARADAY_API_TOKEN = os.getenv("FARADAY_API_TOKEN")
API_TOKEN_HF = os.getenv("API_TOKEN_HF")
cabecario = '''
███████╗ █████╗ ██████╗ █████╗ ██████╗ █████╗ ██╗ ██╗ ██╗ █████╗
██╔════╝██╔══██╗██╔══██╗██╔══██╗██╔══██╗██╔══██╗╚██╗ ██╔╝ ██║██╔══██╗
█████╗ ███████║██████╔╝███████║██║ ██║███████║ ╚████╔╝ ██║███████║
██╔══╝ ██╔══██║██╔══██╗██╔══██║██║ ██║██╔══██║ ╚██╔╝ ██║██╔══██║
██║ ██║ ██║██║ ██║██║ ██║██████╔╝██║ ██║ ██║ ██║██║ ██║
╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝
'''
print(cabecario)
PORT = 8080
DIRECTORY = "dash"
Handler = functools.partial(http.server.SimpleHTTPRequestHandler, directory=DIRECTORY)
def get_xml_text(element, tag, default=''):
"""Helper function to safely get text from XML elements"""
found = element.find(tag)
return found.text if found is not None else default
def start_server():
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print(f"🚀 Servidor rodando em http://localhost:{PORT}/")
httpd.serve_forever()
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()
print("🔧 Configurando ambiente...")
def new_session():
session_id = str(uuid.uuid4())
headers = {
"Authorization": API_TOKEN_HF
}
new_session = f"{HOST_ZAP}/JSON/core/action/newSession/?apikey={api_key}&name={session_id}&overwrite=true"
response_session = requests.get(new_session, headers=headers, verify=False)
print(response_session)
return response_session
title_html = f'<h1 style="display: flex; align-items: center;">' \
f' <img src="https://i.imgur.com/XKc295l.png" alt="Logo" style="height: 40px; margin-right: 10px;" />' \
f' ⚡ Michael Faraday - AI Security⚡' \
f'</h1>'
def upload_swagger(file, target_url, base_url):
new_session()
headers = {
"Authorization": API_TOKEN_HF
}
url = f"{HOST_ZAP}/OTHER/core/other/fileUpload/"
file_name = str(uuid.uuid4())
file_name = f"{file_name}.json"
if file is None:
return "Erro: Nenhum arquivo selecionado."
with open(file.name, "rb") as f:
files = {'fileContents': (file_name, f, 'application/json')}
data = {'apikey': api_key, 'fileName': file_name}
response = requests.post(url, files=files, headers=headers, data=data)
try:
response_json = response.json()
uploaded_path = response_json.get("Uploaded", "")
if not uploaded_path:
return "Erro: Caminho do arquivo não encontrado na resposta."
except json.JSONDecodeError:
return "Erro ao processar resposta do upload."
# Segunda requisição - Importar arquivo
import_url = f"{HOST_ZAP}/JSON/openapi/action/importFile/?apikey={api_key}&file={uploaded_path}&target={target_url}&contextId=&userId="
response_import = requests.get(import_url, headers=headers, verify=False)
# Terceira requisição - Definir baseUrl
base_url_encoded = requests.utils.quote(base_url, safe='')
set_global_var_url = f"{HOST_ZAP}/JSON/script/action/setGlobalVar/?apikey={api_key}&varKey=baseUrl&varValue={base_url_encoded}"
response_base_url = requests.get(set_global_var_url, headers=headers, verify=False)
# Quarta requisição - Executar script OWASP
run_script_url = f"{HOST_ZAP}/JSON/script/action/runStandAloneScript/?apikey={api_key}&scriptName=owasp.js"
response_script = requests.get(run_script_url, headers=headers, verify=False)
# Quinta requisição - Gerar relatório
report_url = f'{HOST_ZAP}/JSON/reports/action/generate/?apikey={api_key}&title=raizen&template=traditional-xml&sites={target_url}&reportDir=%2Fhome%2Fzap%2F.ZAP'
report_response = requests.get(report_url, headers=headers, verify=False)
try:
report_json = report_response.json()
report_path = report_json.get("generate", "")
if not report_path:
return "Erro ao gerar relatório."
except json.JSONDecodeError:
return "Erro ao processar resposta do relatório."
# Sexta requisição - Resumos das vulnerabilidades
sum_url = f'{HOST_ZAP}/JSON/alert/view/alertsSummary/?apikey={api_key}&baseurl='
sum_response = requests.get(sum_url, headers=headers, verify=False).json()
total_high = sum_response["alertsSummary"]["High"]
total_low = sum_response["alertsSummary"]["Low"]
total_medium = sum_response["alertsSummary"]["Medium"]
resume_vulns = f"Total de vulnerabilidades: Graves: {total_high}, Medias: {total_medium} e Baixas: {total_low}"
#print(resume_vulns)
# 1 - AGENTE DE IA
ZAP_API_URL = f"{HOST_ZAP}/JSON/alert/view/alerts/"
params = {
"apikey": api_key,
"baseurl": "",
"start": "",
"count": "",
"riskId": "",
"contextName": ""
}
response = requests.get(ZAP_API_URL, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
# 2. Filtrar apenas risk e description e remover descrições iguais
alerts = data.get("alerts", [])
# Usando um set para garantir que as descrições sejam únicas
unique_alerts = set() # Armazenar descrições únicas
for alert in alerts:
risk = alert.get('risk', 'Desconhecido')
description = alert.get('description', 'Sem descrição')
# Adiciona a descrição ao set (o set garante unicidade)
unique_alerts.add(f"Risco: {risk} - {description}")
# Convertendo o set de volta para lista, para exibição
security_reports = list(unique_alerts)
if security_reports:
report_content = "\n".join(security_reports)
report_content = report_content + "\n"+resume_vulns
else:
report_content = "Nenhum alerta de segurança encontrado."
# 3. Enviar para agente de IA
payload = {
"messages": [
{
"role": "system",
"content": "Sempre responda em português. Você é um agente de IA chamado Michael Faraday, você recebe reports de segurança e os resume e detalha de forma clara e objetiva com base no OWASP e pode até propor soluções. Use emoji onde der para realçar a resposta."
},
{
"role": "user",
"content": report_content
}
],
"token_api": IA_TOKEN
}
headers2 = {"Content-Type": "application/json"}
print("Enviando para a IA....")
ai_response = requests.post(AI_API_URL, data=json.dumps(payload), headers=headers2)
if ai_response.status_code == 200:
ai_result = ai_response.json()
ai_message = ai_result["response"]["response"]
else:
ai_message = "Erro ao conectar com o agente de IA."
# 4. Retornar mensagem para exibição no chat
markdown_message = f"### Resumo do Agente de IA\n\n{ai_message}"
else:
markdown_message = "Erro ao obter alertas de segurança da API ZAP."
# Sexta requisição - Download do relatório
report_file_name = report_path.split("/")[-1]
download_url = f"{HOST_ZAP}/OTHER/core/other/fileDownload/?apikey={api_key}&fileName={report_file_name}"
report_download_response = requests.get(download_url, headers=headers, verify=False)
if report_download_response.status_code == 200:
# Salvar o relatório em um arquivo temporário
import tempfile
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xml")
temp_file.write(report_download_response.content)
temp_file.close()
return [markdown_message, temp_file.name]
else:
return [markdown_message + "\n\n❌ Erro ao baixar o relatório.", None]
image_url = "https://i.imgur.com/XKc295l.png"
title_html = f'<h1 style="display: flex; align-items: center;">' \
f' <img src="https://i.imgur.com/XKc295l.png" alt="Logo" style="height: 40px; margin-right: 10px;" />' \
f' ⚡ Michael Faraday - AI Security⚡' \
f'</h1>'
# Atualize a interface para lidar com múltiplos retornos
with gr.Blocks(theme='ParityError/Interstellar') as demo:
gr.Markdown("# ⚡ Michael Faraday - AI Security 💫")
gr.Markdown("""
👋 Olá, sou **Michael Faraday**, seu agente de IA especializado em segurança de aplicações!
Envie um arquivo Swagger e informe os campos solicitados. Eu irei analisar, detectar vulnerabilidades e sugerir melhorias.
""")
with gr.Row():
file_input = gr.File(label="Selecione o arquivo Swagger")
target_url = gr.Textbox(label="🔗 Target URL")
base_url = gr.Textbox(label="✨ Base URL")
submit_btn = gr.Button("🧙🏻 Analisar")
with gr.Column():
markdown_output = gr.Markdown(label="Resposta do Agente de IA")
file_output = gr.File(label="Relatório de Segurança", visible=True)
submit_btn.click(
fn=upload_swagger,
inputs=[file_input, target_url, base_url],
outputs=[markdown_output, file_output]
)
print(f"🚀 Agente rodando em http://127.0.0.1:7860")
demo.launch(share=False) |