|
from fastapi import FastAPI, HTTPException, Request, Query |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel |
|
from typing import List, Optional |
|
import datetime |
|
import json |
|
import os |
|
from pathlib import Path |
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
class SMSMessage(BaseModel): |
|
sender: str |
|
text: str |
|
timestamp: int |
|
|
|
class BackupRequest(BaseModel): |
|
phone: str |
|
messages: List[SMSMessage] |
|
|
|
|
|
DATA_DIR = "/tmp/sms_data" |
|
os.makedirs(DATA_DIR, exist_ok=True) |
|
|
|
@app.post("/backup") |
|
async def backup_sms(request: BackupRequest): |
|
try: |
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"{DATA_DIR}/{request.phone}_{timestamp}.json" |
|
|
|
|
|
with open(filename, "w") as f: |
|
json.dump({ |
|
"phone": request.phone, |
|
"timestamp": timestamp, |
|
"messages": [msg.dict() for msg in request.messages] |
|
}, f) |
|
|
|
return { |
|
"status": "success", |
|
"saved_count": len(request.messages), |
|
"filename": filename |
|
} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/retrieve") |
|
async def retrieve_sms( |
|
phone: str = Query(..., description="Phone number to retrieve messages for"), |
|
limit: Optional[int] = Query(50, description="Maximum number of messages to retrieve") |
|
): |
|
try: |
|
|
|
matching_files = [] |
|
for entry in os.scandir(DATA_DIR): |
|
if entry.name.startswith(f"{phone}_") and entry.is_file(): |
|
matching_files.append(entry.path) |
|
|
|
if not matching_files: |
|
raise HTTPException(status_code=404, detail="No messages found for this phone number") |
|
|
|
|
|
matching_files.sort(reverse=True) |
|
|
|
|
|
all_messages = [] |
|
for filename in matching_files: |
|
with open(filename, "r") as f: |
|
data = json.load(f) |
|
all_messages.extend(data["messages"]) |
|
|
|
|
|
all_messages.sort(key=lambda x: x["timestamp"], reverse=True) |
|
limited_messages = all_messages[:limit] if limit else all_messages |
|
|
|
return { |
|
"status": "success", |
|
"phone": phone, |
|
"count": len(limited_messages), |
|
"messages": limited_messages |
|
} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/") |
|
async def health_check(): |
|
return { |
|
"status": "healthy", |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"storage_path": DATA_DIR, |
|
"writable": os.access(DATA_DIR, os.W_OK) |
|
} |