Spaces:
Running
on
T4
Running
on
T4
import configparser | |
import logging | |
import os | |
import ast | |
import re | |
from dotenv import load_dotenv | |
# Local .env file | |
load_dotenv() | |
def getconfig(configfile_path: str): | |
""" | |
Read the config file | |
Params | |
---------------- | |
configfile_path: file path of .cfg file | |
""" | |
config = configparser.ConfigParser() | |
try: | |
config.read_file(open(configfile_path)) | |
return config | |
except: | |
logging.warning("config file not found") | |
def get_auth(provider: str) -> dict: | |
"""Get authentication configuration for different providers""" | |
auth_configs = { | |
"huggingface": {"api_key": os.getenv("HF_TOKEN")}, | |
"qdrant": {"api_key": os.getenv("QDRANT_API_KEY")}, | |
} | |
provider = provider.lower() # Normalize to lowercase | |
if provider not in auth_configs: | |
raise ValueError(f"Unsupported provider: {provider}") | |
auth_config = auth_configs[provider] | |
api_key = auth_config.get("api_key") | |
if not api_key: | |
logging.warning(f"No API key found for provider '{provider}'. Please set the appropriate environment variable.") | |
auth_config["api_key"] = None | |
return auth_config | |
def process_content(content: str) -> str: | |
""" | |
Process and clean malformed content that may contain stringified nested lists. | |
The test DB on qdrant somehow got a bit malformed in the processing - but probably good to have this anyway | |
Args: | |
content: Raw content from vector store | |
Returns: | |
Cleaned, readable text content | |
""" | |
if not content: | |
return content | |
# Check if content looks like a stringified list/nested structure | |
content_stripped = content.strip() | |
if content_stripped.startswith('[') and content_stripped.endswith(']'): | |
try: | |
# Parse as literal list structure | |
parsed_content = ast.literal_eval(content_stripped) | |
if isinstance(parsed_content, list): | |
# Flatten nested lists and extract meaningful text | |
def extract_text_from_nested(obj): | |
if isinstance(obj, list): | |
text_items = [] | |
for item in obj: | |
extracted = extract_text_from_nested(item) | |
if extracted and extracted.strip(): | |
text_items.append(extracted) | |
return ' '.join(text_items) | |
elif isinstance(obj, str) and obj.strip(): | |
return obj.strip() | |
elif isinstance(obj, dict): | |
# Handle dict structures if present | |
text_items = [] | |
for key, value in obj.items(): | |
if isinstance(value, str) and value.strip(): | |
text_items.append(f"{key}: {value}") | |
return ' '.join(text_items) | |
else: | |
return '' | |
extracted_text = extract_text_from_nested(parsed_content) | |
if extracted_text and len(extracted_text.strip()) > 0: | |
# Clean up extra whitespace and format nicely | |
cleaned_text = re.sub(r'\s+', ' ', extracted_text).strip() | |
logging.debug(f"Successfully processed nested list content: {len(cleaned_text)} chars") | |
return cleaned_text | |
else: | |
logging.warning("Parsed list content but no meaningful text found") | |
return content # Return original if no meaningful text extracted | |
except (ValueError, SyntaxError) as e: | |
logging.debug(f"Content looks like list but failed to parse: {e}") | |
# Fall through to return original content | |
# For regular text content, just clean up whitespace | |
return re.sub(r'\s+', ' ', content).strip() |