|
|
|
|
|
import pytest |
|
import asyncio |
|
import time |
|
import json |
|
from unittest.mock import AsyncMock, MagicMock, patch |
|
|
|
from ankigen_core.agents.performance import ( |
|
CacheConfig, |
|
PerformanceConfig, |
|
CacheEntry, |
|
MemoryCache, |
|
BatchProcessor, |
|
RequestDeduplicator, |
|
PerformanceOptimizer, |
|
PerformanceMonitor, |
|
get_performance_optimizer, |
|
get_performance_monitor, |
|
cache_response, |
|
rate_limit, |
|
generate_card_cache_key, |
|
generate_judgment_cache_key |
|
) |
|
from ankigen_core.models import Card, CardFront, CardBack |
|
|
|
|
|
|
|
def test_cache_config_defaults(): |
|
"""Test CacheConfig default values""" |
|
config = CacheConfig() |
|
|
|
assert config.enable_caching is True |
|
assert config.cache_ttl == 3600 |
|
assert config.max_cache_size == 1000 |
|
assert config.cache_backend == "memory" |
|
assert config.cache_directory is None |
|
|
|
|
|
def test_cache_config_file_backend(): |
|
"""Test CacheConfig with file backend""" |
|
config = CacheConfig(cache_backend="file") |
|
|
|
assert config.cache_directory == "cache/agents" |
|
|
|
|
|
|
|
def test_performance_config_defaults(): |
|
"""Test PerformanceConfig default values""" |
|
config = PerformanceConfig() |
|
|
|
assert config.enable_batch_processing is True |
|
assert config.max_batch_size == 10 |
|
assert config.batch_timeout == 2.0 |
|
assert config.enable_parallel_execution is True |
|
assert config.max_concurrent_requests == 5 |
|
assert config.enable_request_deduplication is True |
|
assert config.enable_response_caching is True |
|
assert isinstance(config.cache_config, CacheConfig) |
|
|
|
|
|
|
|
def test_cache_entry_creation(): |
|
"""Test CacheEntry creation""" |
|
with patch('time.time', return_value=1000.0): |
|
entry = CacheEntry(value="test", created_at=1000.0) |
|
|
|
assert entry.value == "test" |
|
assert entry.created_at == 1000.0 |
|
assert entry.access_count == 0 |
|
assert entry.last_accessed == 1000.0 |
|
|
|
|
|
def test_cache_entry_expiration(): |
|
"""Test CacheEntry expiration""" |
|
entry = CacheEntry(value="test", created_at=1000.0) |
|
|
|
with patch('time.time', return_value=1500.0): |
|
assert entry.is_expired(ttl=300) is False |
|
|
|
with patch('time.time', return_value=2000.0): |
|
assert entry.is_expired(ttl=300) is True |
|
|
|
|
|
def test_cache_entry_touch(): |
|
"""Test CacheEntry touch method""" |
|
entry = CacheEntry(value="test", created_at=1000.0) |
|
initial_count = entry.access_count |
|
|
|
with patch('time.time', return_value=1500.0): |
|
entry.touch() |
|
|
|
assert entry.access_count == initial_count + 1 |
|
assert entry.last_accessed == 1500.0 |
|
|
|
|
|
|
|
@pytest.fixture |
|
def memory_cache(): |
|
"""Memory cache for testing""" |
|
config = CacheConfig(max_cache_size=3, cache_ttl=300) |
|
return MemoryCache(config) |
|
|
|
|
|
async def test_memory_cache_set_and_get(memory_cache): |
|
"""Test basic cache set and get operations""" |
|
await memory_cache.set("key1", "value1") |
|
|
|
result = await memory_cache.get("key1") |
|
assert result == "value1" |
|
|
|
|
|
async def test_memory_cache_miss(memory_cache): |
|
"""Test cache miss""" |
|
result = await memory_cache.get("nonexistent") |
|
assert result is None |
|
|
|
|
|
async def test_memory_cache_expiration(memory_cache): |
|
"""Test cache entry expiration""" |
|
with patch('time.time', return_value=1000.0): |
|
await memory_cache.set("key1", "value1") |
|
|
|
|
|
with patch('time.time', return_value=2000.0): |
|
result = await memory_cache.get("key1") |
|
assert result is None |
|
|
|
|
|
async def test_memory_cache_lru_eviction(memory_cache): |
|
"""Test LRU eviction when cache is full""" |
|
|
|
await memory_cache.set("key1", "value1") |
|
await memory_cache.set("key2", "value2") |
|
await memory_cache.set("key3", "value3") |
|
|
|
|
|
await memory_cache.get("key1") |
|
|
|
|
|
await memory_cache.set("key4", "value4") |
|
|
|
|
|
assert await memory_cache.get("key1") == "value1" |
|
|
|
|
|
assert await memory_cache.get("key4") == "value4" |
|
|
|
|
|
async def test_memory_cache_remove(memory_cache): |
|
"""Test cache entry removal""" |
|
await memory_cache.set("key1", "value1") |
|
|
|
removed = await memory_cache.remove("key1") |
|
assert removed is True |
|
|
|
result = await memory_cache.get("key1") |
|
assert result is None |
|
|
|
|
|
removed = await memory_cache.remove("nonexistent") |
|
assert removed is False |
|
|
|
|
|
async def test_memory_cache_clear(memory_cache): |
|
"""Test cache clearing""" |
|
await memory_cache.set("key1", "value1") |
|
await memory_cache.set("key2", "value2") |
|
|
|
await memory_cache.clear() |
|
|
|
assert await memory_cache.get("key1") is None |
|
assert await memory_cache.get("key2") is None |
|
|
|
|
|
def test_memory_cache_stats(memory_cache): |
|
"""Test cache statistics""" |
|
stats = memory_cache.get_stats() |
|
|
|
assert "entries" in stats |
|
assert "max_size" in stats |
|
assert "total_accesses" in stats |
|
assert "hit_rate" in stats |
|
|
|
|
|
|
|
@pytest.fixture |
|
def batch_processor(): |
|
"""Batch processor for testing""" |
|
config = PerformanceConfig(max_batch_size=3, batch_timeout=0.1) |
|
return BatchProcessor(config) |
|
|
|
|
|
async def test_batch_processor_immediate_processing_when_disabled(): |
|
"""Test immediate processing when batching is disabled""" |
|
config = PerformanceConfig(enable_batch_processing=False) |
|
processor = BatchProcessor(config) |
|
|
|
mock_func = AsyncMock(return_value=["result"]) |
|
|
|
result = await processor.add_request("batch1", {"data": "test"}, mock_func) |
|
|
|
assert result == ["result"] |
|
mock_func.assert_called_once_with([{"data": "test"}]) |
|
|
|
|
|
async def test_batch_processor_batch_size_trigger(batch_processor): |
|
"""Test batch processing triggered by size limit""" |
|
mock_func = AsyncMock(return_value=["result1", "result2", "result3"]) |
|
|
|
|
|
tasks = [] |
|
for i in range(3): |
|
task = asyncio.create_task(batch_processor.add_request( |
|
"batch1", {"data": f"test{i}"}, mock_func |
|
)) |
|
tasks.append(task) |
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
assert len(results) == 3 |
|
mock_func.assert_called_once() |
|
|
|
|
|
|
|
@pytest.fixture |
|
def request_deduplicator(): |
|
"""Request deduplicator for testing""" |
|
return RequestDeduplicator() |
|
|
|
|
|
async def test_request_deduplicator_unique_requests(request_deduplicator): |
|
"""Test deduplicator with unique requests""" |
|
mock_func = AsyncMock(side_effect=lambda x: f"result_for_{x['id']}") |
|
|
|
result1 = await request_deduplicator.deduplicate_request( |
|
{"id": "1", "data": "test1"}, mock_func |
|
) |
|
result2 = await request_deduplicator.deduplicate_request( |
|
{"id": "2", "data": "test2"}, mock_func |
|
) |
|
|
|
assert result1 == "result_for_{'id': '1', 'data': 'test1'}" |
|
assert result2 == "result_for_{'id': '2', 'data': 'test2'}" |
|
assert mock_func.call_count == 2 |
|
|
|
|
|
async def test_request_deduplicator_duplicate_requests(request_deduplicator): |
|
"""Test deduplicator with duplicate requests""" |
|
mock_func = AsyncMock(return_value="shared_result") |
|
|
|
|
|
tasks = [ |
|
request_deduplicator.deduplicate_request( |
|
{"data": "identical"}, mock_func |
|
) |
|
for _ in range(3) |
|
] |
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
assert all(result == "shared_result" for result in results) |
|
|
|
|
|
mock_func.assert_called_once() |
|
|
|
|
|
|
|
@pytest.fixture |
|
def performance_optimizer(): |
|
"""Performance optimizer for testing""" |
|
config = PerformanceConfig( |
|
max_concurrent_requests=2, |
|
enable_response_caching=True |
|
) |
|
return PerformanceOptimizer(config) |
|
|
|
|
|
async def test_performance_optimizer_caching(performance_optimizer): |
|
"""Test performance optimizer caching""" |
|
mock_func = AsyncMock(return_value="cached_result") |
|
|
|
def cache_key_gen(data): |
|
return f"key_{data['id']}" |
|
|
|
|
|
result1 = await performance_optimizer.optimize_agent_call( |
|
"test_agent", |
|
{"id": "123"}, |
|
mock_func, |
|
cache_key_gen |
|
) |
|
|
|
|
|
result2 = await performance_optimizer.optimize_agent_call( |
|
"test_agent", |
|
{"id": "123"}, |
|
mock_func, |
|
cache_key_gen |
|
) |
|
|
|
assert result1 == "cached_result" |
|
assert result2 == "cached_result" |
|
|
|
|
|
mock_func.assert_called_once() |
|
|
|
|
|
async def test_performance_optimizer_concurrency_limit(performance_optimizer): |
|
"""Test performance optimizer concurrency limiting""" |
|
|
|
async def slow_func(data): |
|
await asyncio.sleep(0.1) |
|
return f"result_{data['id']}" |
|
|
|
|
|
tasks = [ |
|
performance_optimizer.optimize_agent_call( |
|
"test_agent", |
|
{"id": str(i)}, |
|
slow_func |
|
) |
|
for i in range(5) |
|
] |
|
|
|
|
|
results = await asyncio.gather(*tasks) |
|
assert len(results) == 5 |
|
|
|
|
|
def test_performance_optimizer_stats(performance_optimizer): |
|
"""Test performance optimizer statistics""" |
|
stats = performance_optimizer.get_performance_stats() |
|
|
|
assert "config" in stats |
|
assert "concurrency" in stats |
|
assert "cache" in stats |
|
|
|
assert stats["config"]["response_caching"] is True |
|
assert stats["concurrency"]["max_concurrent"] == 2 |
|
|
|
|
|
|
|
async def test_performance_monitor(): |
|
"""Test performance monitoring""" |
|
monitor = PerformanceMonitor() |
|
|
|
|
|
await monitor.record_execution_time("operation1", 1.5) |
|
await monitor.record_execution_time("operation1", 2.0) |
|
await monitor.record_execution_time("operation2", 0.5) |
|
|
|
report = monitor.get_performance_report() |
|
|
|
assert "operation1" in report |
|
assert "operation2" in report |
|
|
|
op1_stats = report["operation1"] |
|
assert op1_stats["count"] == 2 |
|
assert op1_stats["avg_time"] == 1.75 |
|
assert op1_stats["min_time"] == 1.5 |
|
assert op1_stats["max_time"] == 2.0 |
|
|
|
|
|
|
|
async def test_cache_response_decorator(): |
|
"""Test cache_response decorator""" |
|
call_count = 0 |
|
|
|
@cache_response(lambda x: f"key_{x}", ttl=300) |
|
async def test_func(param): |
|
nonlocal call_count |
|
call_count += 1 |
|
return f"result_{param}" |
|
|
|
|
|
result1 = await test_func("test") |
|
assert result1 == "result_test" |
|
assert call_count == 1 |
|
|
|
|
|
result2 = await test_func("test") |
|
assert result2 == "result_test" |
|
assert call_count == 1 |
|
|
|
|
|
async def test_rate_limit_decorator(): |
|
"""Test rate_limit decorator""" |
|
execution_times = [] |
|
|
|
@rate_limit(max_concurrent=1) |
|
async def test_func(delay): |
|
start_time = time.time() |
|
await asyncio.sleep(delay) |
|
end_time = time.time() |
|
execution_times.append((start_time, end_time)) |
|
return "done" |
|
|
|
|
|
tasks = [ |
|
test_func(0.1), |
|
test_func(0.1), |
|
test_func(0.1) |
|
] |
|
|
|
await asyncio.gather(*tasks) |
|
|
|
|
|
assert len(execution_times) == 3 |
|
|
|
|
|
for i in range(len(execution_times) - 1): |
|
current_end = execution_times[i][1] |
|
next_start = execution_times[i + 1][0] |
|
|
|
assert next_start >= current_end - 0.01 |
|
|
|
|
|
|
|
def test_generate_card_cache_key(): |
|
"""Test card cache key generation""" |
|
key1 = generate_card_cache_key( |
|
topic="Python", |
|
subject="programming", |
|
num_cards=5, |
|
difficulty="intermediate" |
|
) |
|
|
|
key2 = generate_card_cache_key( |
|
topic="Python", |
|
subject="programming", |
|
num_cards=5, |
|
difficulty="intermediate" |
|
) |
|
|
|
|
|
assert key1 == key2 |
|
|
|
|
|
key3 = generate_card_cache_key( |
|
topic="Java", |
|
subject="programming", |
|
num_cards=5, |
|
difficulty="intermediate" |
|
) |
|
|
|
assert key1 != key3 |
|
|
|
|
|
def test_generate_judgment_cache_key(): |
|
"""Test judgment cache key generation""" |
|
cards = [ |
|
Card( |
|
front=CardFront(question="What is Python?"), |
|
back=CardBack(answer="A programming language", explanation="", example=""), |
|
card_type="basic" |
|
), |
|
Card( |
|
front=CardFront(question="What is Java?"), |
|
back=CardBack(answer="A programming language", explanation="", example=""), |
|
card_type="basic" |
|
) |
|
] |
|
|
|
key1 = generate_judgment_cache_key(cards, "accuracy") |
|
key2 = generate_judgment_cache_key(cards, "accuracy") |
|
|
|
|
|
assert key1 == key2 |
|
|
|
|
|
key3 = generate_judgment_cache_key(cards, "clarity") |
|
assert key1 != key3 |
|
|
|
|
|
|
|
def test_get_performance_optimizer_singleton(): |
|
"""Test performance optimizer singleton""" |
|
optimizer1 = get_performance_optimizer() |
|
optimizer2 = get_performance_optimizer() |
|
|
|
assert optimizer1 is optimizer2 |
|
|
|
|
|
def test_get_performance_monitor_singleton(): |
|
"""Test performance monitor singleton""" |
|
monitor1 = get_performance_monitor() |
|
monitor2 = get_performance_monitor() |
|
|
|
assert monitor1 is monitor2 |
|
|
|
|
|
|
|
async def test_full_optimization_pipeline(): |
|
"""Test complete optimization pipeline""" |
|
config = PerformanceConfig( |
|
enable_batch_processing=True, |
|
enable_request_deduplication=True, |
|
enable_response_caching=True, |
|
max_batch_size=2, |
|
batch_timeout=0.1 |
|
) |
|
|
|
optimizer = PerformanceOptimizer(config) |
|
|
|
call_count = 0 |
|
|
|
async def mock_processor(data): |
|
nonlocal call_count |
|
call_count += 1 |
|
return f"result_{call_count}" |
|
|
|
def cache_key_gen(data): |
|
return f"key_{data['id']}" |
|
|
|
|
|
tasks = [ |
|
optimizer.optimize_agent_call( |
|
"test_agent", |
|
{"id": "same"}, |
|
mock_processor, |
|
cache_key_gen |
|
) |
|
for _ in range(3) |
|
] |
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
assert all(result == results[0] for result in results) |
|
|
|
|
|
assert call_count == 1 |
|
|
|
|
|
|
|
async def test_memory_cache_error_handling(): |
|
"""Test memory cache error handling""" |
|
cache = MemoryCache(CacheConfig()) |
|
|
|
|
|
await cache.set("key", None) |
|
result = await cache.get("key") |
|
assert result is None |
|
|
|
|
|
async def test_batch_processor_error_handling(): |
|
"""Test batch processor error handling""" |
|
processor = BatchProcessor(PerformanceConfig()) |
|
|
|
async def failing_func(data): |
|
raise Exception("Processing failed") |
|
|
|
with pytest.raises(Exception, match="Processing failed"): |
|
await processor.add_request("batch", {"data": "test"}, failing_func) |
|
|
|
|
|
async def test_performance_optimizer_error_recovery(): |
|
"""Test performance optimizer error recovery""" |
|
optimizer = PerformanceOptimizer(PerformanceConfig()) |
|
|
|
async def sometimes_failing_func(data): |
|
if data.get("fail"): |
|
raise Exception("Intentional failure") |
|
return "success" |
|
|
|
|
|
result = await optimizer.optimize_agent_call( |
|
"test_agent", |
|
{"id": "1"}, |
|
sometimes_failing_func |
|
) |
|
assert result == "success" |
|
|
|
|
|
with pytest.raises(Exception, match="Intentional failure"): |
|
await optimizer.optimize_agent_call( |
|
"test_agent", |
|
{"id": "2", "fail": True}, |
|
sometimes_failing_func |
|
) |