Spaces:
Running
Running
File size: 3,038 Bytes
d9f1916 f8cd8cc 3a2b389 2fb3bf7 000642c d9f1916 f8cd8cc d9f1916 f8cd8cc 2fb3bf7 f8cd8cc 2fb3bf7 3a2b389 2fb3bf7 000642c d9f1916 000642c 2fb3bf7 000642c 2fb3bf7 000642c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
import httpx
from fastapi import FastAPI
from fastapi.responses import JSONResponse, FileResponse
from pydantic import BaseModel
from enum import Enum
from transformers import pipeline
from phishing_datasets import submit_entry
from url_tools import extract_urls, resolve_short_url
from urlscan_client import UrlscanClient
import requests
app = FastAPI()
urlscan = UrlscanClient()
class MessageModel(BaseModel):
text: str
class QueryModel(BaseModel):
sender: str
message: MessageModel
class AppModel(BaseModel):
version: str
class InputModel(BaseModel):
_version: int
query: QueryModel
app: AppModel
class ActionModel(Enum):
# Insufficient information to determine an action to take. In a query response, has the effect of allowing the message to be shown normally.
NONE = 0
# Allow the message to be shown normally.
ALLOW = 1
# Prevent the message from being shown normally, filtered as Junk message.
JUNK = 2
# Prevent the message from being shown normally, filtered as Promotional message.
PROMOTION = 3
# Prevent the message from being shown normally, filtered as Transactional message.
TRANSACTION = 4
class SubActionModel(Enum):
NONE = 0
class OutputModel(BaseModel):
action: ActionModel
sub_action: SubActionModel
pipe = pipeline(task="text-classification", model="mrm8488/bert-tiny-finetuned-sms-spam-detection")
@app.get("/.well-known/apple-app-site-association", include_in_schema=False)
def get_well_known_aasa():
return JSONResponse(
content={
"messagefilter": {
"apps": [
"X9NN3FSS3T.com.lela.Serenity.SerenityMessageFilterExtension",
"X9NN3FSS3T.com.lela.Serenity"
]
}
},
media_type="application/json"
)
@app.get("/robot.txt", include_in_schema=False)
def get_robot_txt():
return FileResponse("robot.txt")
@app.post("/predict")
def predict(model: InputModel) -> OutputModel:
text = model.query.message.text
urls = extract_urls(text)
results = [urlscan.scan(url) for url in urls]
for result in results:
overall = result.get('verdicts', {}).get('overall', {})
print(f"Checking verdict: {overall}")
if overall.get('hasVerdicts') and overall.get('score') > 0:
print("Match found. Submitting entry and returning JUNK.")
submit_entry(model.query.sender, model.query.message.text)
return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE)
label = pipe(text)
if label[0]['label'] == 'LABEL_1':
submit_entry(model.query.sender, model.query.message.text)
return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE)
else:
return OutputModel(action=ActionModel.NONE, sub_action=SubActionModel.NONE)
class ReportModel(BaseModel):
sender: str
message: str
@app.post("/report")
def report(model: ReportModel):
submit_entry(model.sender, model.message) |