File size: 2,468 Bytes
a0e37e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bea5044
a0e37e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bea5044
 
 
a0e37e2
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from typing import Type, Optional
import logging

from pydantic import BaseModel, Field

from elasticsearch import Elasticsearch

from langchain.callbacks.manager import (
    AsyncCallbackManagerForToolRun,
    CallbackManagerForToolRun,
)
from langchain.tools.base import BaseTool
from ask_candid.base.config.connections import SEMANTIC_ELASTIC_QA


logging.basicConfig(level="INFO")
logger = logging.getLogger("elasticsearch_playground")
es = Elasticsearch(
    cloud_id=SEMANTIC_ELASTIC_QA.cloud_id,
    api_key=SEMANTIC_ELASTIC_QA.api_key,
    verify_certs=True,
    request_timeout=60 * 3,
)


class IndexDetailsInput(BaseModel):
    """Input for the list index details tool."""

    index_name: str = Field(
        ...,
        description="The name of the index for which the details are to be retrieved",
    )


class IndexDetailsTool(BaseTool):
    """Tool for getting information about a single ElasticSearch index."""

    name: str = "elastic_index_show_details"  # Added type annotation
    description: str = (
        "Input is an index name, output is a JSON-based string with the aliases, mappings containing the field names, and settings of an index."
    )
    args_schema: Optional[Type[BaseModel]] = (
        IndexDetailsInput  # Ensure this is above the methods
    )

    def _run(
        self,
        index_name: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Get information about a single Elasticsearch index."""
        try:
            # Ensure that `es` is correctly initialized before calling this method
            alias = es.indices.get_alias(index=index_name)
            field_mappings = es.indices.get_field_mapping(index=index_name, fields="*")
            field_settings = es.indices.get_settings(index=index_name)
            return str(
                {
                    "alias": alias[index_name],
                    "field_mappings": field_mappings[index_name],
                    "settings": field_settings[index_name],
                }
            )
        except Exception as e:
            logger.exception(
                "Could not fetch index information for %s: %s", index_name, e
            )
            return ""

    async def _arun(
        self,
        index_name: str = "",
        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
    ) -> str:
        raise NotImplementedError("IndexDetailsTool does not support async operations")