|
import os |
|
from io import BytesIO |
|
from typing import List |
|
import funciones.motion as motion |
|
import tempfile |
|
from fastapi import FastAPI, Form |
|
from fastapi import FastAPI, File, UploadFile, HTTPException, status, BackgroundTasks |
|
from fastapi.responses import StreamingResponse, FileResponse, JSONResponse |
|
import herramientas |
|
|
|
app = FastAPI() |
|
|
|
@app.get("/health", |
|
tags=["Monitoreo Server"], |
|
description="Verifica el estado de salud de la API.", |
|
summary="Health Check" |
|
) |
|
async def health_check(): |
|
""" |
|
Este endpoint devuelve una respuesta 200 OK para indicar que la API está funcionando. |
|
""" |
|
return JSONResponse(content={"status": "ok"}, status_code=200) |
|
|
|
@app.post("/echo-image/", |
|
tags=["Monitoreo Server"], |
|
description="Test endpoint para prueba de envío de imagenes.", |
|
summary="Mirror test para envío de imagenes" |
|
) |
|
async def echo_image(image: UploadFile = File(...)): |
|
if not image.content_type.startswith("image/"): |
|
return {"error": "El archivo no es una imagen"} |
|
contents = await image.read() |
|
return StreamingResponse(BytesIO(contents), media_type=image.content_type) |
|
|
|
@app.post("/motion-image/") |
|
async def motion_image(image: UploadFile = File(...), background_tasks: BackgroundTasks = BackgroundTasks()): |
|
""" |
|
Recibe una imagen, la procesa con motion(), le da movimiento y devuelve el resultado. |
|
""" |
|
if not image.content_type.startswith("image/"): |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail="El archivo no es una imagen. Por favor, suba una imagen." |
|
) |
|
|
|
temp_image_path = None |
|
output_file_generated = False |
|
|
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{image.filename}") as tmp_file: |
|
contents = await image.read() |
|
tmp_file.write(contents) |
|
temp_image_path = tmp_file.name |
|
|
|
print(f"Imagen subida guardada temporalmente en: {temp_image_path}") |
|
|
|
|
|
|
|
path_archivo = await motion.motion(temp_image_path) |
|
print("Salí de monomotion con resultado: ", path_archivo) |
|
output_file_generated = True |
|
|
|
|
|
|
|
import mimetypes |
|
mimetypes.add_type("video/mp4", ".mp4") |
|
mime_type, _ = mimetypes.guess_type(path_archivo) |
|
if not mime_type: |
|
mime_type = "application/octet-stream" |
|
|
|
|
|
|
|
background_tasks.add_task(herramientas.delete_file_on_complete, temp_image_path) |
|
|
|
background_tasks.add_task(herramientas.delete_file_on_complete, path_archivo) |
|
|
|
|
|
return FileResponse( |
|
path=path_archivo, |
|
media_type=mime_type, |
|
filename=os.path.basename(path_archivo) |
|
) |
|
|
|
except HTTPException: |
|
|
|
raise |
|
except Exception as e: |
|
|
|
print(f"Error inesperado en /generate-monomotion-image/: {e}") |
|
raise HTTPException( |
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
detail=f"Ha ocurrido un error interno al procesar la imagen con monomotion: {e}" |
|
) |
|
finally: |
|
|
|
|
|
if temp_image_path and os.path.exists(temp_image_path): |
|
try: |
|
os.remove(temp_image_path) |
|
print(f"Archivo temporal de entrada eliminado: {temp_image_path}") |
|
except Exception as cleanup_e: |
|
print(f"Error al eliminar el archivo temporal de entrada {temp_image_path}: {cleanup_e}") |
|
|
|
@app.post("/echo-random-file/") |
|
async def echo_random_file(files: List[UploadFile] = File(...)): |
|
""" |
|
Recibe múltiples archivos, selecciona uno al azar y lo devuelve. |
|
""" |
|
if not files: |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail="No se enviaron archivos." |
|
) |
|
|
|
temp_file_paths = [] |
|
try: |
|
|
|
for uploaded_file in files: |
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.filename}") as tmp_file: |
|
contents = await uploaded_file.read() |
|
tmp_file.write(contents) |
|
current_temp_path = tmp_file.name |
|
temp_file_paths.append(current_temp_path) |
|
print(f"Guardado temporalmente: {current_temp_path}") |
|
|
|
|
|
|
|
resultado_cinema = await motion.cinema(temp_file_paths) |
|
|
|
print("182.- Resultado es: ", resultado_cinema) |
|
print("Y su tipo es: ", type(resultado_cinema)) |
|
|
|
|
|
if isinstance(resultado_cinema, str): |
|
|
|
|
|
import mimetypes |
|
mime_type, _ = mimetypes.guess_type(resultado_cinema) |
|
if not mime_type: |
|
mime_type = "application/octet-stream" |
|
|
|
|
|
|
|
return FileResponse( |
|
path=resultado_cinema, |
|
media_type=mime_type, |
|
filename=os.path.basename(resultado_cinema) |
|
) |
|
elif isinstance(resultado_cinema, dict) and "error" in resultado_cinema: |
|
|
|
raise HTTPException( |
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
detail=f"Error en la función cinema: {resultado_cinema['error']}" |
|
) |
|
else: |
|
|
|
return resultado_cinema |
|
|
|
except HTTPException: |
|
|
|
raise |
|
except Exception as e: |
|
|
|
print(f"Error inesperado en /process-files-with-cinema/: {e}") |
|
raise HTTPException( |
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
detail=f"Ha ocurrido un error interno al procesar los archivos con cinema: {e}" |
|
) |
|
finally: |
|
|
|
for path in temp_file_paths: |
|
if os.path.exists(path): |
|
try: |
|
|
|
print(f"Archivo temporal eliminado: {path}") |
|
except Exception as cleanup_e: |
|
print(f"Error al eliminar el archivo temporal {path}: {cleanup_e}") |