Spaces:
Running
Running
| import hashlib | |
| from datetime import datetime, timezone | |
| from http import HTTPStatus | |
| from io import BytesIO | |
| from pathlib import Path | |
| from typing import Annotated | |
| from uuid import UUID | |
| from fastapi import APIRouter, Depends, HTTPException, UploadFile | |
| from fastapi.responses import StreamingResponse | |
| from langflow.api.utils import CurrentActiveUser, DbSession | |
| from langflow.api.v1.schemas import UploadFileResponse | |
| from langflow.services.database.models.flow import Flow | |
| from langflow.services.deps import get_storage_service | |
| from langflow.services.storage.service import StorageService | |
| from langflow.services.storage.utils import build_content_type_from_extension | |
| router = APIRouter(tags=["Files"], prefix="/files") | |
| # Create dep that gets the flow_id from the request | |
| # then finds it in the database and returns it while | |
| # using the current user as the owner | |
| async def get_flow_id( | |
| flow_id: UUID, | |
| current_user: CurrentActiveUser, | |
| session: DbSession, | |
| ): | |
| flow_id_str = str(flow_id) | |
| # AttributeError: 'SelectOfScalar' object has no attribute 'first' | |
| flow = await session.get(Flow, flow_id_str) | |
| if not flow: | |
| raise HTTPException(status_code=404, detail="Flow not found") | |
| if flow.user_id != current_user.id: | |
| raise HTTPException(status_code=403, detail="You don't have access to this flow") | |
| return flow_id_str | |
| async def upload_file( | |
| *, | |
| file: UploadFile, | |
| flow_id: Annotated[UUID, Depends(get_flow_id)], | |
| current_user: CurrentActiveUser, | |
| session: DbSession, | |
| storage_service: Annotated[StorageService, Depends(get_storage_service)], | |
| ) -> UploadFileResponse: | |
| try: | |
| flow_id_str = str(flow_id) | |
| flow = await session.get(Flow, flow_id_str) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| if flow.user_id != current_user.id: | |
| raise HTTPException(status_code=403, detail="You don't have access to this flow") | |
| try: | |
| file_content = await file.read() | |
| timestamp = datetime.now(tz=timezone.utc).astimezone().strftime("%Y-%m-%d_%H-%M-%S") | |
| file_name = file.filename or hashlib.sha256(file_content).hexdigest() | |
| full_file_name = f"{timestamp}_{file_name}" | |
| folder = flow_id_str | |
| await storage_service.save_file(flow_id=folder, file_name=full_file_name, data=file_content) | |
| return UploadFileResponse(flow_id=flow_id_str, file_path=f"{folder}/{full_file_name}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| async def download_file( | |
| file_name: str, flow_id: UUID, storage_service: Annotated[StorageService, Depends(get_storage_service)] | |
| ): | |
| flow_id_str = str(flow_id) | |
| extension = file_name.split(".")[-1] | |
| if not extension: | |
| raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}") | |
| try: | |
| content_type = build_content_type_from_extension(extension) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| if not content_type: | |
| raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}") | |
| try: | |
| file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name) | |
| headers = { | |
| "Content-Disposition": f"attachment; filename={file_name} filename*=UTF-8''{file_name}", | |
| "Content-Type": "application/octet-stream", | |
| "Content-Length": str(len(file_content)), | |
| } | |
| return StreamingResponse(BytesIO(file_content), media_type=content_type, headers=headers) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| async def download_image(file_name: str, flow_id: UUID): | |
| storage_service = get_storage_service() | |
| extension = file_name.split(".")[-1] | |
| flow_id_str = str(flow_id) | |
| if not extension: | |
| raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}") | |
| try: | |
| content_type = build_content_type_from_extension(extension) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| if not content_type: | |
| raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}") | |
| if not content_type.startswith("image"): | |
| raise HTTPException(status_code=500, detail=f"Content type {content_type} is not an image") | |
| try: | |
| file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name) | |
| return StreamingResponse(BytesIO(file_content), media_type=content_type) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| async def download_profile_picture( | |
| folder_name: str, | |
| file_name: str, | |
| ): | |
| try: | |
| storage_service = get_storage_service() | |
| extension = file_name.split(".")[-1] | |
| config_dir = storage_service.settings_service.settings.config_dir | |
| config_path = Path(config_dir) # type: ignore[arg-type] | |
| folder_path = config_path / "profile_pictures" / folder_name | |
| content_type = build_content_type_from_extension(extension) | |
| file_content = await storage_service.get_file(flow_id=folder_path, file_name=file_name) # type: ignore[arg-type] | |
| return StreamingResponse(BytesIO(file_content), media_type=content_type) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| async def list_profile_pictures(): | |
| try: | |
| storage_service = get_storage_service() | |
| config_dir = storage_service.settings_service.settings.config_dir | |
| config_path = Path(config_dir) # type: ignore[arg-type] | |
| people_path = config_path / "profile_pictures/People" | |
| space_path = config_path / "profile_pictures/Space" | |
| people = await storage_service.list_files(flow_id=people_path) # type: ignore[arg-type] | |
| space = await storage_service.list_files(flow_id=space_path) # type: ignore[arg-type] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| files = [f"People/{i}" for i in people] | |
| files += [f"Space/{i}" for i in space] | |
| return {"files": files} | |
| async def list_files( | |
| flow_id: Annotated[UUID, Depends(get_flow_id)], | |
| storage_service: Annotated[StorageService, Depends(get_storage_service)], | |
| ): | |
| try: | |
| flow_id_str = str(flow_id) | |
| files = await storage_service.list_files(flow_id=flow_id_str) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| return {"files": files} | |
| async def delete_file( | |
| file_name: str, | |
| flow_id: Annotated[UUID, Depends(get_flow_id)], | |
| storage_service: Annotated[StorageService, Depends(get_storage_service)], | |
| ): | |
| try: | |
| flow_id_str = str(flow_id) | |
| await storage_service.delete_file(flow_id=flow_id_str, file_name=file_name) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| return {"message": f"File {file_name} deleted successfully"} | |