Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
Test script for the FAISS vector database created by loader.py. | |
Allows interactive querying of the documentation and searching for specific strings in results. | |
""" | |
import os | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS | |
# Configuration | |
FAISS_INDEX_PATH = "faiss_index" | |
EMBEDDINGS_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
def load_vector_db(): | |
""" | |
Load the FAISS vector database from disk. | |
""" | |
if not os.path.exists(FAISS_INDEX_PATH): | |
print(f"Error: FAISS index not found at {FAISS_INDEX_PATH}") | |
print("Please run loader.py first to create the vector database.") | |
return None | |
try: | |
# Initialize embeddings (must use same model as used for creating the index) | |
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDINGS_MODEL_NAME) | |
# Load FAISS index | |
vector_db = FAISS.load_local(FAISS_INDEX_PATH, embeddings, allow_dangerous_deserialization=True) | |
print(f"Successfully loaded FAISS index from {FAISS_INDEX_PATH}") | |
return vector_db | |
except Exception as e: | |
print(f"Error loading FAISS index: {e}") | |
return None | |
def search_documents(vector_db, query, k=3): | |
""" | |
Search the vector database for documents similar to the query. | |
Args: | |
vector_db: The loaded FAISS vector store | |
query: The search query string | |
k: Number of top results to return | |
Returns: | |
List of documents with similarity scores | |
""" | |
try: | |
# Perform similarity search with scores | |
docs_with_scores = vector_db.similarity_search_with_score(query, k=k) | |
return docs_with_scores | |
except Exception as e: | |
print(f"Error during search: {e}") | |
return [] | |
def find_string_in_results(docs_with_scores, search_string): | |
""" | |
Find specific strings in the search results. | |
Args: | |
docs_with_scores: List of (document, score) tuples from similarity search | |
search_string: String to search for in the documents | |
Returns: | |
List of matches with context | |
""" | |
matches = [] | |
for i, (doc, score) in enumerate(docs_with_scores): | |
content = doc.page_content.lower() | |
search_lower = search_string.lower() | |
if search_lower in content: | |
# Find all occurrences | |
start = 0 | |
while True: | |
pos = content.find(search_lower, start) | |
if pos == -1: | |
break | |
# Extract context around the match (100 chars before and after) | |
context_start = max(0, pos - 100) | |
context_end = min(len(doc.page_content), pos + len(search_string) + 100) | |
context = doc.page_content[context_start:context_end] | |
matches.append({ | |
'result_index': i + 1, | |
'source': doc.metadata.get('source', 'Unknown'), | |
'similarity_score': score, | |
'context': context, | |
'position': pos | |
}) | |
start = pos + 1 | |
return matches | |
def print_search_results(docs_with_scores): | |
""" | |
Print search results in a formatted way. | |
""" | |
print(f"\n{'='*60}") | |
print(f"SEARCH RESULTS ({len(docs_with_scores)} results)") | |
print(f"{'='*60}") | |
for i, (doc, score) in enumerate(docs_with_scores, 1): | |
print(f"\n--- Result {i} (Similarity Score: {score:.4f}) ---") | |
print(f"Source: {doc.metadata.get('source', 'Unknown')}") | |
print(f"Content Preview: {doc.page_content[:200]}...") | |
print("-" * 50) | |
def print_string_matches(matches, search_string): | |
""" | |
Print string search matches in a formatted way. | |
""" | |
if not matches: | |
print(f"\nβ No matches found for '{search_string}' in the search results.") | |
return | |
print(f"\n{'='*60}") | |
print(f"STRING SEARCH RESULTS for '{search_string}' ({len(matches)} matches)") | |
print(f"{'='*60}") | |
for match in matches: | |
print(f"\nβ Match found in Result #{match['result_index']}") | |
print(f"Source: {match['source']}") | |
print(f"Similarity Score: {match['similarity_score']:.4f}") | |
print(f"Context: ...{match['context']}...") | |
print("-" * 50) | |
# Test cases configuration | |
TEST_CASES = [ | |
{ | |
"question": "What is the management IP address of DCX-L2LEAF1A?", | |
"expected_string": "172.20.20.57" | |
}, | |
{ | |
"question": "What VLANs are on DCX-L2LEAF1A?", | |
"expected_string": "VRF10_VLAN11" | |
}, | |
{ | |
"question": "What spanning tree mode is configured?", | |
"expected_string": "mstp" | |
}, | |
{ | |
"question": "What is the NTP server configured?", | |
"expected_string": "0.pool.ntp.org" | |
}, | |
{ | |
"question": "What VRF is used for management?", | |
"expected_string": "MGMT" | |
}, | |
{ | |
"question": "What is the default gateway for management?", | |
"expected_string": "172.20.20.1" | |
}, | |
{ | |
"question": "What ethernet interfaces are on DCX-L2LEAF1A?", | |
"expected_string": "Ethernet1" | |
}, | |
{ | |
"question": "What port-channel interfaces exist?", | |
"expected_string": "Port-Channel1" | |
}, | |
{ | |
"question": "What is the TerminAttr daemon configuration?", | |
"expected_string": "apiserver.arista.io" | |
}, | |
{ | |
"question": "What local users are configured?", | |
"expected_string": "admin" | |
}, | |
{ | |
"question": "What's the description of Ethernet5 on DCX-L2LEAF1A?", | |
"expected_string": "DCX-leaf1-server1_iLO" | |
}, | |
{ | |
"question": "What channel group is configured on DCX-L2LEAF1A Ethernet1?", | |
"expected_string": "channel-group 1" | |
}, | |
{ | |
"question": "What VLAN access mode is on DCX-L2LEAF1A Ethernet5?", | |
"expected_string": "access vlan 11" | |
}, | |
{ | |
"question": "What is the DNS server configured?", | |
"expected_string": "8.8.8.8" | |
}, | |
{ | |
"question": "What protocol is used for management API on DCX-L2LEAF1A?", | |
"expected_string": "protocol https" | |
} | |
] | |
def run_automated_tests(vector_db): | |
""" | |
Run automated tests using predefined test cases. | |
""" | |
print("\nπ§ͺ Running Automated FAISS Database Tests") | |
print("=" * 60) | |
total_tests = len(TEST_CASES) | |
passed_tests = 0 | |
failed_tests = 0 | |
for i, test_case in enumerate(TEST_CASES, 1): | |
question = test_case["question"] | |
expected_string = test_case["expected_string"] | |
print(f"\nπ Test {i}/{total_tests}: {question}") | |
print(f"Expected to find: '{expected_string}'") | |
print("-" * 50) | |
try: | |
# Perform semantic search (increase k to get more results) | |
docs_with_scores = search_documents(vector_db, question, k=10) | |
if not docs_with_scores: | |
print("β FAIL: No search results found") | |
failed_tests += 1 | |
continue | |
# Search for the expected string in results | |
matches = find_string_in_results(docs_with_scores, expected_string) | |
if matches: | |
print(f"β PASS: Found '{expected_string}' in search results") | |
print(f" Found in: {matches[0]['source']}") | |
print(f" Similarity Score: {matches[0]['similarity_score']:.4f}") | |
print(f" Context: ...{matches[0]['context'][:100]}...") | |
passed_tests += 1 | |
else: | |
print(f"β FAIL: '{expected_string}' not found in search results") | |
print(" Search results sources (top 5):") | |
for j, (doc, score) in enumerate(docs_with_scores[:5]): | |
print(f" - {doc.metadata.get('source', 'Unknown')} (score: {score:.4f})") | |
# Debug: show content preview of top result | |
if docs_with_scores: | |
top_doc = docs_with_scores[0][0] | |
print(f" Top result content preview: {top_doc.page_content[:200]}...") | |
failed_tests += 1 | |
except Exception as e: | |
print(f"β ERROR: {e}") | |
failed_tests += 1 | |
# Print summary | |
print("\n" + "=" * 60) | |
print("π TEST SUMMARY") | |
print("=" * 60) | |
print(f"Total Tests: {total_tests}") | |
print(f"β Passed: {passed_tests}") | |
print(f"β Failed: {failed_tests}") | |
print(f"Success Rate: {(passed_tests/total_tests)*100:.1f}%") | |
if failed_tests > 0: | |
print(f"\nβ οΈ {failed_tests} test(s) failed. Check the results above.") | |
return False | |
else: | |
print(f"\nπ All tests passed!") | |
return True | |
def main(): | |
""" | |
Main function to run the automated test script. | |
""" | |
print("π Loading FAISS Vector Database...") | |
# Load the vector database | |
vector_db = load_vector_db() | |
if vector_db is None: | |
return | |
# Run automated tests | |
success = run_automated_tests(vector_db) | |
# Exit with appropriate code | |
if not success: | |
exit(1) | |
else: | |
print("\nβ All tests completed successfully!") | |
exit(0) | |
if __name__ == "__main__": | |
main() | |