Performance Optimization Quick Start Guideο
Get started with Claude Force performance optimizations in minutes!
This guide provides step-by-step instructions to implement the high-impact optimizations identified in the Performance Analysis and Optimization Plan.
π Quick Wins (Start Here!)ο
Priority 1: Enable Async Execution (2-3 hours)ο
Impact: 50-80% faster workflows Difficulty: Medium Time: 2-3 hours
Step 1: Install Dependenciesο
# Already have anthropic>=0.40.0, add async file I/O
pip install aiofiles>=23.0.0
Step 2: Create Minimal Async Wrapperο
Create claude_force/simple_async.py:
"""
Simple async wrapper for quick wins.
"""
import asyncio
from anthropic import AsyncAnthropic
import os
class SimpleAsyncOrchestrator:
"""Minimal async wrapper for parallel execution."""
def __init__(self):
self.client = AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
async def execute_task(self, prompt: str, model: str = "claude-3-5-haiku-20241022"):
"""Execute a single task."""
response = await self.client.messages.create(
model=model,
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
async def execute_parallel(self, prompts: list[str]):
"""Execute multiple prompts in parallel."""
results = await asyncio.gather(*[
self.execute_task(prompt)
for prompt in prompts
])
return results
# Usage example
async def main():
orchestrator = SimpleAsyncOrchestrator()
prompts = [
"Explain Python lists",
"Explain Python dicts",
"Explain Python sets"
]
import time
start = time.time()
results = await orchestrator.execute_parallel(prompts)
elapsed = time.time() - start
print(f"Completed {len(results)} tasks in {elapsed:.2f}s")
print(f"Throughput: {len(results) / elapsed:.2f} tasks/second")
if __name__ == "__main__":
asyncio.run(main())
Step 3: Test It!ο
# Run the async example
python claude_force/simple_async.py
Expected Output:
Completed 3 tasks in 3.5s
Throughput: 0.86 tasks/second
Compare to sequential (9-12s for 3 tasks) β 2-3x faster! β
Priority 2: Basic Response Caching (1-2 hours)ο
Impact: 30-50% cost reduction Difficulty: Easy Time: 1-2 hours
Step 1: Create Simple Cacheο
Create claude_force/simple_cache.py:
"""
Simple file-based response cache.
"""
import hashlib
import json
import time
from pathlib import Path
from typing import Optional
class SimpleCache:
"""Basic TTL-based cache."""
def __init__(self, cache_dir: str = ".cache", ttl_hours: int = 24):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.ttl_seconds = ttl_hours * 3600
def _key(self, prompt: str) -> str:
"""Generate cache key."""
return hashlib.md5(prompt.encode()).hexdigest()
def get(self, prompt: str) -> Optional[str]:
"""Get cached response."""
key = self._key(prompt)
cache_file = self.cache_dir / f"{key}.json"
if not cache_file.exists():
return None
# Check TTL
age = time.time() - cache_file.stat().st_mtime
if age > self.ttl_seconds:
cache_file.unlink()
return None
# Load from cache
with open(cache_file) as f:
data = json.load(f)
return data['response']
def set(self, prompt: str, response: str):
"""Cache response."""
key = self._key(prompt)
cache_file = self.cache_dir / f"{key}.json"
with open(cache_file, 'w') as f:
json.dump({
'prompt': prompt,
'response': response,
'timestamp': time.time()
}, f)
# Usage example
def main():
from anthropic import Anthropic
import os
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
cache = SimpleCache()
prompt = "What are Python decorators?"
# Try cache first
cached = cache.get(prompt)
if cached:
print("β
Cache hit!")
print(cached)
return
# Cache miss - call API
print("β Cache miss - calling API...")
response = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
result = response.content[0].text
# Cache the response
cache.set(prompt, result)
print("β
Response cached")
print(result)
if __name__ == "__main__":
main()
Step 2: Test It!ο
# First run - cache miss
python claude_force/simple_cache.py
# Second run - cache hit (should be instant!)
python claude_force/simple_cache.py
Expected:
First run: 2-4 seconds (API call)
Second run: <100ms (cache hit) β ~40x faster! β
Priority 3: Parallel Workflow Pattern (30 minutes)ο
Impact: 2-3x throughput for workflows Difficulty: Easy Time: 30 minutes
Simple Patternο
"""
Simple parallel workflow pattern.
"""
import asyncio
from anthropic import AsyncAnthropic
import os
async def parallel_workflow_example():
"""Execute workflow steps in parallel where possible."""
client = AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
async def call_api(prompt: str):
response = await client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
import time
start = time.time()
# Step 1: Run independent checks in parallel
print("Running parallel checks...")
linter, type_check, security = await asyncio.gather(
call_api("Check Python code style"),
call_api("Check Python type hints"),
call_api("Check for security issues")
)
print(f" Parallel checks completed in {time.time() - start:.2f}s")
# Step 2: Run final review with results
print("Running final review...")
review_prompt = f"""
Based on these findings:
- Linter: {linter[:100]}...
- Type Check: {type_check[:100]}...
- Security: {security[:100]}...
Provide final code review.
"""
final_review = await call_api(review_prompt)
total_time = time.time() - start
print(f"\nβ
Workflow completed in {total_time:.2f}s")
print(f" (Sequential would take ~{total_time * 1.5:.2f}s)")
if __name__ == "__main__":
asyncio.run(parallel_workflow_example())
Test:
python parallel_workflow_example.py
Expected: 3 independent steps complete in ~4s (vs ~12s sequential) β 3x faster! β
π― Implementation Checklistο
Phase 1: Foundation (Week 1-4)ο
Week 1-2: Async Implementationο
Install
aiofiles>=23.0.0Create
claude_force/async_orchestrator.pyAdd async methods to existing
AgentOrchestratorWrite unit tests for async operations
Update CLI with
--asyncflagTest backward compatibility
Validation:
# Test async execution
python -m pytest tests/test_async_orchestrator.py -v
# Benchmark speedup
python benchmarks/benchmark_async_vs_sync.py
Week 3-4: Response Cachingο
Create
claude_force/response_cache.pyImplement TTL-based expiration
Add LRU eviction logic
Integrate with orchestrator
Add cache CLI commands
Test cache correctness
Validation:
# Test cache functionality
python -m pytest tests/test_response_cache.py -v
# Check cache stats
claude-force cache stats
Phase 2: Advanced (Week 5-8)ο
Week 5-6: Parallel Workflowsο
Create
claude_force/workflow_dag.pyUpdate workflow schema with dependencies
Implement DAG executor
Add cycle detection
Test parallel execution
Validation:
# Test DAG execution
python -m pytest tests/test_workflow_dag.py -v
# Benchmark workflow speedup
python benchmarks/benchmark_parallel_workflow.py
Week 7: Metrics & Cachingο
Implement metrics aggregation
Add query result caching (LRU)
Optimize analytics queries
Validation:
# Test aggregation
python -m pytest tests/test_metrics_aggregation.py -v
# Verify query performance
python benchmarks/benchmark_query_cache.py
π§ Development Setupο
1. Create Feature Branchο
cd /home/user/claude-force
git checkout -b feature/performance-optimization-v2.3
2. Set Up Development Environmentο
# Install development dependencies
pip install -e ".[dev]"
pip install aiofiles>=23.0.0
# Install testing tools
pip install pytest pytest-asyncio pytest-cov pytest-benchmark
# Install profiling tools
pip install py-spy memory_profiler line_profiler
3. Run Baseline Benchmarksο
# Create baseline for comparison
python benchmarks/run_benchmarks.py --output baseline_v2.2.0.json
# Save baseline
cp baseline_v2.2.0.json benchmarks/results/
π§ͺ Testing Strategyο
Unit Testsο
# Test async operations
pytest tests/test_async_orchestrator.py -v
# Test caching
pytest tests/test_response_cache.py -v
# Test DAG execution
pytest tests/test_workflow_dag.py -v
# Run all tests with coverage
pytest --cov=claude_force --cov-report=html
Integration Testsο
# Test full async workflow
pytest tests/integration/test_async_workflows.py -v
# Test cache integration
pytest tests/integration/test_cache_integration.py -v
# Test parallel execution
pytest tests/integration/test_parallel_execution.py -v
Performance Testsο
# Benchmark async vs sync
python benchmarks/benchmark_async_vs_sync.py
# Benchmark cache effectiveness
python benchmarks/benchmark_cache.py
# Benchmark parallel workflows
python benchmarks/benchmark_parallel_workflow.py
# Full benchmark suite
python benchmarks/run_benchmarks.py --report
π Performance Validationο
Before Startingο
Run baseline benchmarks:
# Sequential execution (3 agents)
time python -c "
from claude_force.orchestrator import AgentOrchestrator
orch = AgentOrchestrator()
for i in range(3):
orch.execute_agent('python-expert', f'Task {i}')
"
Expected: 9-15 seconds
After Async Implementationο
Run async benchmarks:
# Parallel execution (3 agents)
time python -c "
import asyncio
from claude_force.async_orchestrator import AsyncAgentOrchestrator
async def test():
orch = AsyncAgentOrchestrator()
await orch.execute_multiple([
('python-expert', 'Task 0'),
('python-expert', 'Task 1'),
('python-expert', 'Task 2')
])
asyncio.run(test())
"
Expected: 3-5 seconds β 2-3x faster! β
After Caching Implementationο
# First run (cache miss)
time claude-force execute python-expert "Explain decorators"
# Second run (cache hit)
time claude-force execute python-expert "Explain decorators"
Expected:
First run: 2-4 seconds
Second run: <100ms β ~40x faster! β
π Common Issues & Solutionsο
Issue 1: βRuntimeError: This event loop is already runningβο
Cause: Trying to call asyncio.run() inside an existing event loop
Solution:
# Instead of:
asyncio.run(my_async_function())
# Use:
await my_async_function() # If already in async context
# Or:
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(my_async_function())
Issue 2: Cache returning stale dataο
Cause: TTL too long or cache not invalidating
Solution:
# Reduce TTL
cache = ResponseCache(ttl_hours=1) # Instead of 24
# Or clear cache
claude-force cache clear
# Or exclude specific agents
config['cache']['exclude_agents'] = ['creative-writer']
Issue 3: βToo many open filesβο
Cause: Running many concurrent async operations
Solution:
# Limit concurrency with semaphore
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def execute_with_limit(task):
async with semaphore:
return await execute_task(task)
Issue 4: Tests hangingο
Cause: Async operations not completing
Solution:
# Add timeout to tests
@pytest.mark.asyncio
@pytest.mark.timeout(30)
async def test_async_operation():
result = await orchestrator.execute_agent_async(...)
assert result
# Or use asyncio timeout
async with asyncio.timeout(10):
result = await long_operation()
π Code Examplesο
Example 1: Basic Async Usageο
import asyncio
from claude_force.async_orchestrator import AsyncAgentOrchestrator
async def main():
orch = AsyncAgentOrchestrator()
# Single async execution
result = await orch.execute_agent("python-expert", "Explain async/await")
print(result)
# Multiple async executions
results = await orch.execute_multiple([
("python-expert", "Explain lists"),
("python-expert", "Explain dicts"),
("code-reviewer", "Review my code")
])
for i, result in enumerate(results):
print(f"Task {i}: {result[:100]}...")
asyncio.run(main())
Example 2: Cache-Aware Executionο
from claude_force.orchestrator import AgentOrchestrator
# Enable caching in config
orch = AgentOrchestrator()
# First call - cache miss
result1 = orch.execute_agent("python-expert", "Explain decorators")
print("First call:", result1[:100])
# Second call - cache hit (same prompt)
result2 = orch.execute_agent("python-expert", "Explain decorators")
print("Second call (cached):", result2[:100])
# Check cache stats
stats = orch.response_cache.get_stats()
print(f"Cache hit rate: {stats['hit_rate']}")
Example 3: Parallel Workflowο
import asyncio
from claude_force.workflow_dag import WorkflowDAG
from claude_force.async_orchestrator import AsyncAgentOrchestrator
async def run_workflow():
orch = AsyncAgentOrchestrator()
dag = WorkflowDAG(orch)
workflow = {
'name': 'Code Quality Check',
'steps': [
{
'id': 'linter',
'agent': 'linter',
'task': 'Check code style',
'dependencies': []
},
{
'id': 'type-check',
'agent': 'type-checker',
'task': 'Check types',
'dependencies': []
},
{
'id': 'review',
'agent': 'code-reviewer',
'task': 'Final review',
'dependencies': ['linter', 'type-check']
}
]
}
result = await dag.execute_workflow(workflow)
print(f"Completed in {result['total_time_seconds']:.2f}s")
print(f"Execution order: {result['execution_order']}")
asyncio.run(run_workflow())
π Best Practicesο
1. Async Operationsο
DO:
β Use
async/awaitfor I/O-bound operationsβ Limit concurrency with semaphores
β Add timeouts to prevent hanging
β Handle exceptions in async code
DONβT:
β Mix sync and async without proper handling
β Run CPU-intensive tasks in async (use threading/multiprocessing)
β Forget to await async functions
β Create unbounded concurrent operations
2. Cachingο
DO:
β Use conservative TTL values (24 hours)
β Exclude non-deterministic agents
β Monitor cache hit rates
β Implement cache size limits
DONβT:
β Cache forever (use TTL)
β Cache sensitive data without encryption
β Ignore cache invalidation
β Let cache grow unbounded
3. Parallel Workflowsο
DO:
β Identify truly independent steps
β Use dependency tracking
β Test for race conditions
β Monitor for deadlocks
DONβT:
β Assume all steps can run in parallel
β Forget dependency order
β Ignore shared state
β Skip error handling
π Measuring Successο
Key Metrics to Trackο
# Track before and after
metrics = {
'workflow_time_before': 15.3, # seconds
'workflow_time_after': 5.2, # seconds
'speedup': 2.94, # 2.94x faster
'cost_before': 0.002, # USD per execution
'cost_after': 0.0008, # USD per execution
'cost_savings': 0.60, # 60% savings
'cache_hit_rate': 0.45, # 45% cache hits
'throughput_before': 60, # tasks/hour
'throughput_after': 180 # tasks/hour (3x)
}
Performance Dashboardο
# View performance summary
claude-force analytics summary --days 7
# Compare before/after
claude-force analytics compare \
--baseline baseline_v2.2.0.json \
--current current_v2.3.0.json
π Quick Start Checklistο
Hour 1: Async Basics
Install
aiofilesCreate
simple_async.pyTest parallel execution
Measure speedup
Hour 2-3: Basic Caching
Create
simple_cache.pyTest cache hit/miss
Measure latency improvement
Measure cost savings
Hour 4-6: Integration
Integrate async into orchestrator
Integrate cache into orchestrator
Write tests
Update documentation
Week 2+: Advanced Features
Implement parallel workflows
Add metrics aggregation
Enhance monitoring
Production deployment
π― Success Criteriaο
After implementing these quick wins, you should see:
β 2-3x faster execution for concurrent tasks
β <100ms response time for cached queries
β 30-50% cost reduction (with caching)
β 2-5x throughput improvement
β All tests passing (backward compatibility)
π Need Help?ο
Resources:
Common Questions:
Q: βMy async code is hangingβ β Check for timeouts and proper await usage
Q: βCache not workingβ β Verify TTL and check cache stats
Q: βNo speedup observedβ β Ensure tasks are actually running in parallel
Ready to optimize? Start with Priority 1 and work your way down! π