|
from fastapi import FastAPI, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel |
|
from typing import List |
|
import datetime |
|
import json |
|
import os |
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
class SmsItem(BaseModel): |
|
address: str |
|
body: str |
|
date: int |
|
type: str |
|
|
|
class SmsRequest(BaseModel): |
|
sms_list: List[SmsItem] |
|
|
|
|
|
DATA_DIR = "/tmp/sms_data" |
|
os.makedirs(DATA_DIR, exist_ok=True) |
|
|
|
@app.post("/save-sms") |
|
async def save_sms(request: SmsRequest): |
|
try: |
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"{DATA_DIR}/backup_{timestamp}.json" |
|
|
|
|
|
with open(filename, "w") as f: |
|
json.dump([sms.dict() for sms in request.sms_list], f, indent=2) |
|
|
|
return { |
|
"status": "success", |
|
"message": f"{len(request.sms_list)} SMS saved", |
|
"filename": filename |
|
} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/get-sms") |
|
async def get_sms(): |
|
try: |
|
sms_list = [] |
|
files = sorted(os.listdir(DATA_DIR)) |
|
for file in files: |
|
if file.endswith(".json"): |
|
file_path = os.path.join(DATA_DIR, file) |
|
with open(file_path, "r") as f: |
|
data = json.load(f) |
|
sms_list.extend(data) |
|
return { |
|
"status": "success", |
|
"total_sms": len(sms_list), |
|
"sms_list": sms_list |
|
} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/") |
|
async def health_check(): |
|
return { |
|
"status": "healthy", |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"sms_storage": DATA_DIR |
|
} |
|
|