File size: 3,079 Bytes
4549091 e775e54 4549091 e775e54 4549091 a190574 e775e54 4549091 e775e54 4549091 e775e54 4549091 e775e54 a190574 4549091 e775e54 4549091 e775e54 4549091 e775e54 4549091 e775e54 4549091 e775e54 4549091 e775e54 4549091 a190574 4549091 a190574 4549091 e775e54 4549091 a190574 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
from fastapi import FastAPI, HTTPException, Request, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import datetime
import json
import os
from pathlib import Path
app = FastAPI()
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class SMSMessage(BaseModel):
sender: str
text: str
timestamp: int
class BackupRequest(BaseModel):
phone: str
messages: List[SMSMessage]
# Use /tmp directory which has write permissions
DATA_DIR = "/tmp/sms_data"
os.makedirs(DATA_DIR, exist_ok=True)
@app.post("/backup")
async def backup_sms(request: BackupRequest):
try:
# Create filename with phone number and timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{DATA_DIR}/{request.phone}_{timestamp}.json"
# Save messages to file
with open(filename, "w") as f:
json.dump({
"phone": request.phone,
"timestamp": timestamp,
"messages": [msg.dict() for msg in request.messages]
}, f)
return {
"status": "success",
"saved_count": len(request.messages),
"filename": filename
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/retrieve")
async def retrieve_sms(
phone: str = Query(..., description="Phone number to retrieve messages for"),
limit: Optional[int] = Query(50, description="Maximum number of messages to retrieve")
):
try:
# Find all files for this phone number
matching_files = []
for entry in os.scandir(DATA_DIR):
if entry.name.startswith(f"{phone}_") and entry.is_file():
matching_files.append(entry.path)
if not matching_files:
raise HTTPException(status_code=404, detail="No messages found for this phone number")
# Sort by newest first
matching_files.sort(reverse=True)
# Read all messages
all_messages = []
for filename in matching_files:
with open(filename, "r") as f:
data = json.load(f)
all_messages.extend(data["messages"])
# Apply limit and sort by timestamp (newest first)
all_messages.sort(key=lambda x: x["timestamp"], reverse=True)
limited_messages = all_messages[:limit] if limit else all_messages
return {
"status": "success",
"phone": phone,
"count": len(limited_messages),
"messages": limited_messages
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.datetime.now().isoformat(),
"storage_path": DATA_DIR,
"writable": os.access(DATA_DIR, os.W_OK)
} |