File size: 16,851 Bytes
56fd459 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 |
# Tests for ankigen_core/agents/performance.py
import pytest
import asyncio
import time
import json
from unittest.mock import AsyncMock, MagicMock, patch
from ankigen_core.agents.performance import (
CacheConfig,
PerformanceConfig,
CacheEntry,
MemoryCache,
BatchProcessor,
RequestDeduplicator,
PerformanceOptimizer,
PerformanceMonitor,
get_performance_optimizer,
get_performance_monitor,
cache_response,
rate_limit,
generate_card_cache_key,
generate_judgment_cache_key
)
from ankigen_core.models import Card, CardFront, CardBack
# Test CacheConfig
def test_cache_config_defaults():
"""Test CacheConfig default values"""
config = CacheConfig()
assert config.enable_caching is True
assert config.cache_ttl == 3600
assert config.max_cache_size == 1000
assert config.cache_backend == "memory"
assert config.cache_directory is None
def test_cache_config_file_backend():
"""Test CacheConfig with file backend"""
config = CacheConfig(cache_backend="file")
assert config.cache_directory == "cache/agents"
# Test PerformanceConfig
def test_performance_config_defaults():
"""Test PerformanceConfig default values"""
config = PerformanceConfig()
assert config.enable_batch_processing is True
assert config.max_batch_size == 10
assert config.batch_timeout == 2.0
assert config.enable_parallel_execution is True
assert config.max_concurrent_requests == 5
assert config.enable_request_deduplication is True
assert config.enable_response_caching is True
assert isinstance(config.cache_config, CacheConfig)
# Test CacheEntry
def test_cache_entry_creation():
"""Test CacheEntry creation"""
with patch('time.time', return_value=1000.0):
entry = CacheEntry(value="test", created_at=1000.0)
assert entry.value == "test"
assert entry.created_at == 1000.0
assert entry.access_count == 0
assert entry.last_accessed == 1000.0
def test_cache_entry_expiration():
"""Test CacheEntry expiration"""
entry = CacheEntry(value="test", created_at=1000.0)
with patch('time.time', return_value=1500.0):
assert entry.is_expired(ttl=300) is False # Not expired
with patch('time.time', return_value=2000.0):
assert entry.is_expired(ttl=300) is True # Expired
def test_cache_entry_touch():
"""Test CacheEntry touch method"""
entry = CacheEntry(value="test", created_at=1000.0)
initial_count = entry.access_count
with patch('time.time', return_value=1500.0):
entry.touch()
assert entry.access_count == initial_count + 1
assert entry.last_accessed == 1500.0
# Test MemoryCache
@pytest.fixture
def memory_cache():
"""Memory cache for testing"""
config = CacheConfig(max_cache_size=3, cache_ttl=300)
return MemoryCache(config)
async def test_memory_cache_set_and_get(memory_cache):
"""Test basic cache set and get operations"""
await memory_cache.set("key1", "value1")
result = await memory_cache.get("key1")
assert result == "value1"
async def test_memory_cache_miss(memory_cache):
"""Test cache miss"""
result = await memory_cache.get("nonexistent")
assert result is None
async def test_memory_cache_expiration(memory_cache):
"""Test cache entry expiration"""
with patch('time.time', return_value=1000.0):
await memory_cache.set("key1", "value1")
# Move forward in time beyond TTL
with patch('time.time', return_value=2000.0):
result = await memory_cache.get("key1")
assert result is None
async def test_memory_cache_lru_eviction(memory_cache):
"""Test LRU eviction when cache is full"""
# Fill cache to capacity
await memory_cache.set("key1", "value1")
await memory_cache.set("key2", "value2")
await memory_cache.set("key3", "value3")
# Access key1 to make it recently used
await memory_cache.get("key1")
# Add another item, should evict oldest unused
await memory_cache.set("key4", "value4")
# key1 should still be there (recently accessed)
assert await memory_cache.get("key1") == "value1"
# key4 should be there (newest)
assert await memory_cache.get("key4") == "value4"
async def test_memory_cache_remove(memory_cache):
"""Test cache entry removal"""
await memory_cache.set("key1", "value1")
removed = await memory_cache.remove("key1")
assert removed is True
result = await memory_cache.get("key1")
assert result is None
# Removing non-existent key
removed = await memory_cache.remove("nonexistent")
assert removed is False
async def test_memory_cache_clear(memory_cache):
"""Test cache clearing"""
await memory_cache.set("key1", "value1")
await memory_cache.set("key2", "value2")
await memory_cache.clear()
assert await memory_cache.get("key1") is None
assert await memory_cache.get("key2") is None
def test_memory_cache_stats(memory_cache):
"""Test cache statistics"""
stats = memory_cache.get_stats()
assert "entries" in stats
assert "max_size" in stats
assert "total_accesses" in stats
assert "hit_rate" in stats
# Test BatchProcessor
@pytest.fixture
def batch_processor():
"""Batch processor for testing"""
config = PerformanceConfig(max_batch_size=3, batch_timeout=0.1)
return BatchProcessor(config)
async def test_batch_processor_immediate_processing_when_disabled():
"""Test immediate processing when batching is disabled"""
config = PerformanceConfig(enable_batch_processing=False)
processor = BatchProcessor(config)
mock_func = AsyncMock(return_value=["result"])
result = await processor.add_request("batch1", {"data": "test"}, mock_func)
assert result == ["result"]
mock_func.assert_called_once_with([{"data": "test"}])
async def test_batch_processor_batch_size_trigger(batch_processor):
"""Test batch processing triggered by size limit"""
mock_func = AsyncMock(return_value=["result1", "result2", "result3"])
# Add requests up to batch size
tasks = []
for i in range(3):
task = asyncio.create_task(batch_processor.add_request(
"batch1", {"data": f"test{i}"}, mock_func
))
tasks.append(task)
results = await asyncio.gather(*tasks)
# All requests should get results
assert len(results) == 3
mock_func.assert_called_once()
# Test RequestDeduplicator
@pytest.fixture
def request_deduplicator():
"""Request deduplicator for testing"""
return RequestDeduplicator()
async def test_request_deduplicator_unique_requests(request_deduplicator):
"""Test deduplicator with unique requests"""
mock_func = AsyncMock(side_effect=lambda x: f"result_for_{x['id']}")
result1 = await request_deduplicator.deduplicate_request(
{"id": "1", "data": "test1"}, mock_func
)
result2 = await request_deduplicator.deduplicate_request(
{"id": "2", "data": "test2"}, mock_func
)
assert result1 == "result_for_{'id': '1', 'data': 'test1'}"
assert result2 == "result_for_{'id': '2', 'data': 'test2'}"
assert mock_func.call_count == 2
async def test_request_deduplicator_duplicate_requests(request_deduplicator):
"""Test deduplicator with duplicate requests"""
mock_func = AsyncMock(return_value="shared_result")
# Send identical requests concurrently
tasks = [
request_deduplicator.deduplicate_request(
{"data": "identical"}, mock_func
)
for _ in range(3)
]
results = await asyncio.gather(*tasks)
# All should get the same result
assert all(result == "shared_result" for result in results)
# Function should only be called once
mock_func.assert_called_once()
# Test PerformanceOptimizer
@pytest.fixture
def performance_optimizer():
"""Performance optimizer for testing"""
config = PerformanceConfig(
max_concurrent_requests=2,
enable_response_caching=True
)
return PerformanceOptimizer(config)
async def test_performance_optimizer_caching(performance_optimizer):
"""Test performance optimizer caching"""
mock_func = AsyncMock(return_value="cached_result")
def cache_key_gen(data):
return f"key_{data['id']}"
# First call should execute function
result1 = await performance_optimizer.optimize_agent_call(
"test_agent",
{"id": "123"},
mock_func,
cache_key_gen
)
# Second call with same data should use cache
result2 = await performance_optimizer.optimize_agent_call(
"test_agent",
{"id": "123"},
mock_func,
cache_key_gen
)
assert result1 == "cached_result"
assert result2 == "cached_result"
# Function should only be called once
mock_func.assert_called_once()
async def test_performance_optimizer_concurrency_limit(performance_optimizer):
"""Test performance optimizer concurrency limiting"""
# Slow function to test concurrency
async def slow_func(data):
await asyncio.sleep(0.1)
return f"result_{data['id']}"
# Start more tasks than the concurrency limit
tasks = [
performance_optimizer.optimize_agent_call(
"test_agent",
{"id": str(i)},
slow_func
)
for i in range(5)
]
# All should complete successfully despite concurrency limit
results = await asyncio.gather(*tasks)
assert len(results) == 5
def test_performance_optimizer_stats(performance_optimizer):
"""Test performance optimizer statistics"""
stats = performance_optimizer.get_performance_stats()
assert "config" in stats
assert "concurrency" in stats
assert "cache" in stats # Should have cache stats
assert stats["config"]["response_caching"] is True
assert stats["concurrency"]["max_concurrent"] == 2
# Test PerformanceMonitor
async def test_performance_monitor():
"""Test performance monitoring"""
monitor = PerformanceMonitor()
# Record some metrics
await monitor.record_execution_time("operation1", 1.5)
await monitor.record_execution_time("operation1", 2.0)
await monitor.record_execution_time("operation2", 0.5)
report = monitor.get_performance_report()
assert "operation1" in report
assert "operation2" in report
op1_stats = report["operation1"]
assert op1_stats["count"] == 2
assert op1_stats["avg_time"] == 1.75
assert op1_stats["min_time"] == 1.5
assert op1_stats["max_time"] == 2.0
# Test decorators
async def test_cache_response_decorator():
"""Test cache_response decorator"""
call_count = 0
@cache_response(lambda x: f"key_{x}", ttl=300)
async def test_func(param):
nonlocal call_count
call_count += 1
return f"result_{param}"
# First call
result1 = await test_func("test")
assert result1 == "result_test"
assert call_count == 1
# Second call should use cache
result2 = await test_func("test")
assert result2 == "result_test"
assert call_count == 1 # Should not increment
async def test_rate_limit_decorator():
"""Test rate_limit decorator"""
execution_times = []
@rate_limit(max_concurrent=1)
async def test_func(delay):
start_time = time.time()
await asyncio.sleep(delay)
end_time = time.time()
execution_times.append((start_time, end_time))
return "done"
# Start multiple tasks
tasks = [
test_func(0.1),
test_func(0.1),
test_func(0.1)
]
await asyncio.gather(*tasks)
# With max_concurrent=1, executions should be sequential
assert len(execution_times) == 3
# Check that they don't overlap significantly
for i in range(len(execution_times) - 1):
current_end = execution_times[i][1]
next_start = execution_times[i + 1][0]
# Allow small overlap due to timing precision
assert next_start >= current_end - 0.01
# Test utility functions
def test_generate_card_cache_key():
"""Test card cache key generation"""
key1 = generate_card_cache_key(
topic="Python",
subject="programming",
num_cards=5,
difficulty="intermediate"
)
key2 = generate_card_cache_key(
topic="Python",
subject="programming",
num_cards=5,
difficulty="intermediate"
)
# Same parameters should generate same key
assert key1 == key2
# Different parameters should generate different key
key3 = generate_card_cache_key(
topic="Java",
subject="programming",
num_cards=5,
difficulty="intermediate"
)
assert key1 != key3
def test_generate_judgment_cache_key():
"""Test judgment cache key generation"""
cards = [
Card(
front=CardFront(question="What is Python?"),
back=CardBack(answer="A programming language", explanation="", example=""),
card_type="basic"
),
Card(
front=CardFront(question="What is Java?"),
back=CardBack(answer="A programming language", explanation="", example=""),
card_type="basic"
)
]
key1 = generate_judgment_cache_key(cards, "accuracy")
key2 = generate_judgment_cache_key(cards, "accuracy")
# Same cards and judgment type should generate same key
assert key1 == key2
# Different judgment type should generate different key
key3 = generate_judgment_cache_key(cards, "clarity")
assert key1 != key3
# Test global instances
def test_get_performance_optimizer_singleton():
"""Test performance optimizer singleton"""
optimizer1 = get_performance_optimizer()
optimizer2 = get_performance_optimizer()
assert optimizer1 is optimizer2
def test_get_performance_monitor_singleton():
"""Test performance monitor singleton"""
monitor1 = get_performance_monitor()
monitor2 = get_performance_monitor()
assert monitor1 is monitor2
# Integration tests
async def test_full_optimization_pipeline():
"""Test complete optimization pipeline"""
config = PerformanceConfig(
enable_batch_processing=True,
enable_request_deduplication=True,
enable_response_caching=True,
max_batch_size=2,
batch_timeout=0.1
)
optimizer = PerformanceOptimizer(config)
call_count = 0
async def mock_processor(data):
nonlocal call_count
call_count += 1
return f"result_{call_count}"
def cache_key_gen(data):
return f"key_{data['id']}"
# Multiple calls with same data should be deduplicated and cached
tasks = [
optimizer.optimize_agent_call(
"test_agent",
{"id": "same"},
mock_processor,
cache_key_gen
)
for _ in range(3)
]
results = await asyncio.gather(*tasks)
# All should get same result
assert all(result == results[0] for result in results)
# Processor should only be called once due to deduplication
assert call_count == 1
# Error handling tests
async def test_memory_cache_error_handling():
"""Test memory cache error handling"""
cache = MemoryCache(CacheConfig())
# Test with None values
await cache.set("key", None)
result = await cache.get("key")
assert result is None
async def test_batch_processor_error_handling():
"""Test batch processor error handling"""
processor = BatchProcessor(PerformanceConfig())
async def failing_func(data):
raise Exception("Processing failed")
with pytest.raises(Exception, match="Processing failed"):
await processor.add_request("batch", {"data": "test"}, failing_func)
async def test_performance_optimizer_error_recovery():
"""Test performance optimizer error recovery"""
optimizer = PerformanceOptimizer(PerformanceConfig())
async def sometimes_failing_func(data):
if data.get("fail"):
raise Exception("Intentional failure")
return "success"
# Successful call
result = await optimizer.optimize_agent_call(
"test_agent",
{"id": "1"},
sometimes_failing_func
)
assert result == "success"
# Failing call should propagate error
with pytest.raises(Exception, match="Intentional failure"):
await optimizer.optimize_agent_call(
"test_agent",
{"id": "2", "fail": True},
sometimes_failing_func
) |