rag_hydro / elastic /indexing.py
Anas Bader
redo
4cbe4e9
import logging
from elasticsearch import Elasticsearch, exceptions
from typing import Dict, Any
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
embedding_dimension = 1536
def create_mapping(properties: Dict[str, Any]) -> Dict[str, Any]:
"""Helper function to create index mappings with predefined settings."""
return {
"settings": {"number_of_shards": 1, "number_of_replicas": 1},
"mappings": {"properties": properties},
}
def retrieval_index() -> Dict[str, Any]:
"""Returns the Elasticsearch mapping for retrieval indices."""
return create_mapping(
{
"chunk_id": {"type": "keyword"},
"chunk": {"type": "text"},
"embedding": {
"type": "dense_vector",
"dims": embedding_dimension,
},
"certification": {"type": "keyword"},
"source_file": {"type": "keyword"},
"timestamp": {"type": "date"},
}
)
def create_elasticsearch_index(es_client: Elasticsearch, index_name: str) -> bool:
"""
Create an Elasticsearch index with the appropriate mapping.
Args:
es_client (Elasticsearch): The Elasticsearch client instance.
index_name (str): The name of the index to create.
Returns:
bool: True if the index was created successfully, False otherwise.
"""
try:
mapping = retrieval_index()
if es_client.indices.exists(index=index_name):
logger.warning(f"Index '{index_name}' already exists. Skipping creation.")
return True
es_client.indices.create(index=index_name, body=mapping)
logger.info(f"Index '{index_name}' created successfully.")
return True
except Exception as e:
logger.error(f"Unexpected error while creating index '{index_name}': {e}")
return False