Spaces:
Running
Running
from typing import Type, Optional | |
import logging | |
from pydantic import BaseModel, Field | |
from elasticsearch import Elasticsearch | |
from langchain.callbacks.manager import ( | |
AsyncCallbackManagerForToolRun, | |
CallbackManagerForToolRun, | |
) | |
from langchain.tools.base import BaseTool | |
from ask_candid.base.config.connections import SEMANTIC_ELASTIC_QA | |
logging.basicConfig(level="INFO") | |
logger = logging.getLogger("elasticsearch_playground") | |
es = Elasticsearch( | |
cloud_id=SEMANTIC_ELASTIC_QA.cloud_id, | |
api_key=SEMANTIC_ELASTIC_QA.api_key, | |
verify_certs=True, | |
request_timeout=60 * 3, | |
) | |
class IndexDetailsInput(BaseModel): | |
"""Input for the list index details tool.""" | |
index_name: str = Field( | |
..., | |
description="The name of the index for which the details are to be retrieved", | |
) | |
class IndexDetailsTool(BaseTool): | |
"""Tool for getting information about a single ElasticSearch index.""" | |
name: str = "elastic_index_show_details" # Added type annotation | |
description: str = ( | |
"Input is an index name, output is a JSON-based string with the aliases, mappings containing the field names, and settings of an index." | |
) | |
args_schema: Optional[Type[BaseModel]] = ( | |
IndexDetailsInput # Ensure this is above the methods | |
) | |
def _run( | |
self, | |
index_name: str, | |
run_manager: Optional[CallbackManagerForToolRun] = None, | |
) -> str: | |
"""Get information about a single Elasticsearch index.""" | |
try: | |
# Ensure that `es` is correctly initialized before calling this method | |
alias = es.indices.get_alias(index=index_name) | |
field_mappings = es.indices.get_field_mapping(index=index_name, fields="*") | |
field_settings = es.indices.get_settings(index=index_name) | |
return str( | |
{ | |
"alias": alias[index_name], | |
"field_mappings": field_mappings[index_name], | |
"settings": field_settings[index_name], | |
} | |
) | |
except Exception as e: | |
logger.exception( | |
"Could not fetch index information for %s: %s", index_name, e | |
) | |
return "" | |
async def _arun( | |
self, | |
index_name: str = "", | |
run_manager: Optional[AsyncCallbackManagerForToolRun] = None, | |
) -> str: | |
raise NotImplementedError("IndexDetailsTool does not support async operations") | |