|
import logging |
|
from elasticsearch import Elasticsearch, exceptions |
|
from typing import Dict, Any |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
logging.basicConfig(level=logging.DEBUG) |
|
|
|
embedding_dimension = 1536 |
|
|
|
def create_mapping(properties: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Helper function to create index mappings with predefined settings.""" |
|
return { |
|
"settings": {"number_of_shards": 1, "number_of_replicas": 1}, |
|
"mappings": {"properties": properties}, |
|
} |
|
|
|
|
|
def retrieval_index() -> Dict[str, Any]: |
|
"""Returns the Elasticsearch mapping for retrieval indices.""" |
|
return create_mapping( |
|
{ |
|
"chunk_id": {"type": "keyword"}, |
|
"chunk": {"type": "text"}, |
|
"embedding": { |
|
"type": "dense_vector", |
|
"dims": embedding_dimension, |
|
}, |
|
"certification": {"type": "keyword"}, |
|
"source_file": {"type": "keyword"}, |
|
"timestamp": {"type": "date"}, |
|
} |
|
) |
|
|
|
|
|
def create_elasticsearch_index(es_client: Elasticsearch, index_name: str) -> bool: |
|
""" |
|
Create an Elasticsearch index with the appropriate mapping. |
|
|
|
Args: |
|
es_client (Elasticsearch): The Elasticsearch client instance. |
|
index_name (str): The name of the index to create. |
|
|
|
Returns: |
|
bool: True if the index was created successfully, False otherwise. |
|
""" |
|
try: |
|
mapping = retrieval_index() |
|
|
|
if es_client.indices.exists(index=index_name): |
|
logger.warning(f"Index '{index_name}' already exists. Skipping creation.") |
|
return True |
|
|
|
es_client.indices.create(index=index_name, body=mapping) |
|
logger.info(f"Index '{index_name}' created successfully.") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Unexpected error while creating index '{index_name}': {e}") |
|
return False |