Spaces:
Runtime error
Runtime error
| from datetime import datetime | |
| import logging,os | |
| import fastapi | |
| from fastapi import Body, Depends | |
| import uvicorn | |
| from fastapi import BackgroundTasks,HTTPException , status | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi import FastAPI as Response | |
| from sse_starlette.sse import EventSourceResponse | |
| from starlette.responses import StreamingResponse | |
| from starlette.requests import Request | |
| from pydantic import BaseModel, Extra | |
| from enum import Enum | |
| from typing import List, Dict, Any, Generator, Optional, cast, Callable | |
| from chromaIntf import ChromaIntf | |
| import baseInfra.dropbox_handler as dbh | |
| import traceback | |
| from pathlib import Path | |
| logger=logging.getLogger("root") | |
| chromaIntf=ChromaIntf() | |
| class PathRequest(BaseModel): | |
| dir: str = "/" | |
| class MetaD(BaseModel): | |
| timestamp: Optional[str]= datetime.now().isoformat() | |
| class Config: | |
| allow_population_by_field_name = True | |
| extra = Extra.allow | |
| class DocWithMeta(BaseModel): | |
| text: str = "" | |
| metadata: Optional[MetaD] = MetaD() | |
| async def catch_exceptions_middleware( | |
| request: Request, call_next: Callable[[Request], Any] | |
| ) -> Response: | |
| try: | |
| #print("In exception cater middleware") | |
| #print(request.headers) | |
| #print(await request.body()) | |
| return await call_next(request) | |
| except Exception as e: | |
| print(repr(e)) | |
| print("from the catch exception") | |
| traceback.print_exc() | |
| return JSONResponse(content={"error": repr(e)}, status_code=500) | |
| app = fastapi.FastAPI(title="Maya Persistence") | |
| app.middleware("http")(catch_exceptions_middleware) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| api_base="/api/v1" | |
| async def get_matching_docs(inStr: str, kwargs: Dict [Any, Any] ,background_tasks:BackgroundTasks) -> Any: | |
| """ | |
| Gets the query embeddings and uses metadata appropriately and gets the matching docs for query | |
| TODO: Add parameter for type of query and number of docs to return | |
| TODO: Add parameter to return the source information as well | |
| """ | |
| logger.info("================================================") | |
| logger.info("Received new request at /getMatchingDocs:"+inStr) | |
| #retVal=await chromaIntf.getRelevantDocs(inStr,kwargs) | |
| background_tasks.add_task(chromaIntf.getRelevantDocs,inStr,kwargs) | |
| retVal="added to queue" | |
| logger.info("Returning :"+retVal) | |
| logger.info("================================================") | |
| return retVal | |
| async def get_matching_docs_sync(inStr: str, kwargs: Dict [Any, Any]) -> Any: | |
| """ | |
| Gets the query embeddings and uses metadata appropriately and gets the matching docs for query | |
| This is synchronous version | |
| TODO: Add parameter for type of query and number of docs to return | |
| TODO: Add parameter to return the source information as well | |
| """ | |
| logger.info("================================================") | |
| logger.info("Received new request at /getMatchingDocsSync:"+inStr) | |
| retVal=await chromaIntf.getRelevantDocs(inStr,kwargs) | |
| #background_tasks.add_task(chromaIntf.getRelevantDocs,inStr,kwargs) | |
| #retVal="added to queue" | |
| logger.info(f"Returning: {len(retVal)} results") | |
| logger.info("================================================") | |
| return retVal | |
| async def add_text_document(inDoc: DocWithMeta ) -> Any: | |
| """ | |
| Add text and metadata to the db | |
| """ | |
| logger.info("================================================") | |
| logger.info("Received new request at /addTextDocuments:"+inDoc.text) | |
| retVal= await chromaIntf.addText(inDoc.text,inDoc.metadata) | |
| logger.info(f"Returning: {len(retVal)} results") | |
| logger.info("================================================") | |
| return retVal | |
| async def persist_db(): | |
| return await chromaIntf.persist() | |
| async def reset_db(): | |
| return await dbh.restoreFolder("db") | |
| def walk(path: PathRequest): | |
| print("Received walk request for",path) | |
| dirs=[] | |
| fileList=[] | |
| try: | |
| for root, items, files in os.walk(path.dir,topdown=True): | |
| for item in items: | |
| dirs.append(item) | |
| for filea in files: | |
| fileList.append(filea) | |
| except Exception: | |
| print("got exception",Exception) | |
| print("dirs ",dirs) | |
| print("files ",fileList) | |
| response= JSONResponse(content= {"dirs":dirs,"files":fileList}) | |
| return response | |
| async def list_docs(): | |
| return await chromaIntf.listDocs() | |
| print(__name__) | |
| if __name__ == '__main__' or __name__ == "src.main": | |
| uvicorn.run(f"{Path(__file__).stem}:app", host="0.0.0.0", port=8000,workers=1) | |
| #uvicorn.run(app, host="0.0.0.0", port=8000) |