|
import os |
|
import logging |
|
from elasticsearch import Elasticsearch, ConnectionError, AuthenticationException |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
ES_CLIENT_URL = os.getenv("ELASTICSEARCH_HOSTS", "http://localhost:9200") |
|
|
|
class ElasticsearchClientError(Exception): |
|
"""Custom exception for Elasticsearch client errors.""" |
|
pass |
|
|
|
def get_es_client() -> Elasticsearch: |
|
""" |
|
Establish connection to Elasticsearch and return the client instance. |
|
Raises ElasticsearchClientError if the connection cannot be established. |
|
""" |
|
|
|
try: |
|
print("es client", ES_CLIENT_URL) |
|
|
|
es_client = Elasticsearch( |
|
hosts=[ES_CLIENT_URL], |
|
) |
|
|
|
|
|
if not es_client.ping(): |
|
error_message = "Elasticsearch cluster is not reachable!" |
|
logger.error(error_message) |
|
raise ElasticsearchClientError(error_message) |
|
|
|
logger.info("Successfully connected to Elasticsearch") |
|
return es_client |
|
|
|
except (ConnectionError, AuthenticationException) as e: |
|
error_message = f"Elasticsearch connection error: {e}" |
|
logger.error(error_message) |
|
raise ElasticsearchClientError(error_message) from e |