from typing import Type, Optional import logging from pydantic import BaseModel, Field from elasticsearch import Elasticsearch from langchain.callbacks.manager import ( AsyncCallbackManagerForToolRun, CallbackManagerForToolRun, ) from langchain.tools.base import BaseTool from ask_candid.base.config.connections import SEMANTIC_ELASTIC_QA logging.basicConfig(level="INFO") logger = logging.getLogger("elasticsearch_playground") es = Elasticsearch( cloud_id=SEMANTIC_ELASTIC_QA.cloud_id, api_key=SEMANTIC_ELASTIC_QA.api_key, verify_certs=True, request_timeout=60 * 3, ) class IndexDetailsInput(BaseModel): """Input for the list index details tool.""" index_name: str = Field( ..., description="The name of the index for which the details are to be retrieved", ) class IndexDetailsTool(BaseTool): """Tool for getting information about a single ElasticSearch index.""" name: str = "elastic_index_show_details" # Added type annotation description: str = ( "Input is an index name, output is a JSON-based string with the aliases, mappings containing the field names, and settings of an index." ) args_schema: Optional[Type[BaseModel]] = ( IndexDetailsInput # Ensure this is above the methods ) def _run( self, index_name: str, run_manager: Optional[CallbackManagerForToolRun] = None, ) -> str: """Get information about a single Elasticsearch index.""" try: # Ensure that `es` is correctly initialized before calling this method alias = es.indices.get_alias(index=index_name) field_mappings = es.indices.get_field_mapping(index=index_name, fields="*") field_settings = es.indices.get_settings(index=index_name) return str( { "alias": alias[index_name], "field_mappings": field_mappings[index_name], "settings": field_settings[index_name], } ) except Exception as e: logger.exception( "Could not fetch index information for %s: %s", index_name, e ) return "" async def _arun( self, index_name: str = "", run_manager: Optional[AsyncCallbackManagerForToolRun] = None, ) -> str: raise NotImplementedError("IndexDetailsTool does not support async operations")