Claude Force Performance Optimization Implementation Plan
Version: 1.0 Date: 2025-11-14 Status: Draft - Pending Approval Target Release: v2.3.0
Executive Summary
This document outlines a comprehensive plan to optimize Claude Force performance based on the findings in the Performance Analysis Report. The implementation is divided into three phases over 3 months, targeting:
Primary Goals
Metric |
Current |
Target |
Improvement |
|---|---|---|---|
Workflow Execution Time |
12-30s (3 agents) |
4-10s |
50-80% faster |
Cost Per Execution |
Baseline |
30-50% less |
Cost reduction |
Throughput |
1 task/time |
2-5 tasks/time |
2-5x increase |
Cache Hit Rate |
0% |
20-70% |
New capability |
Implementation Phases
Phase 1 (Month 1): Foundation ✅ High Impact
Async API client implementation
Response caching system
Backward compatibility layer
Phase 2 (Month 2): Advanced Optimization ⭐ Medium Impact
Parallel workflow execution (DAG-based)
Metrics aggregation
Query result caching
Phase 3 (Month 3): Polish & Enhancement 📊 Low Impact
Performance monitoring enhancements
Circuit breakers
Advanced caching strategies
Expected ROI
Investment: ~40-60 development hours
Benefits:
- 50-80% faster workflow execution → Better UX
- 30-50% cost reduction → $500-2000/month savings (depending on usage)
- 2-5x throughput → Support for concurrent users
- Production-ready scalability → Enterprise readiness
Table of Contents
Phase 1: Foundation Optimizations
Duration: 3-4 weeks Priority: 🔴 CRITICAL Expected Impact: 50-80% performance improvement
1.1 Async API Client Implementation
Objective
Replace synchronous Claude API calls with async implementation to enable non-blocking operations and concurrent execution.
Current State
# orchestrator.py (synchronous)
response = self.client.messages.create(
model=model,
max_tokens=max_tokens,
temperature=temperature,
messages=[{"role": "user", "content": prompt}]
)
Target State
# orchestrator.py (async)
async def execute_agent_async(self, agent_name: str, task: str):
response = await self.async_client.messages.create(
model=model,
max_tokens=max_tokens,
temperature=temperature,
messages=[{"role": "user", "content": prompt}]
)
return response
Implementation Steps
Step 1: Install async dependencies
# Update requirements.txt
anthropic>=0.40.0 # Already supports async
aiofiles>=23.0.0 # Async file I/O
Step 2: Create async orchestrator module
Create claude_force/async_orchestrator.py:
"""
Async version of AgentOrchestrator for non-blocking operations.
"""
import asyncio
import aiofiles
from pathlib import Path
from typing import Optional, Dict, Any
from anthropic import AsyncAnthropic
from .performance_tracker import PerformanceTracker
from .agent_memory import AgentMemory
class AsyncAgentOrchestrator:
"""Async orchestrator for non-blocking agent execution."""
def __init__(self, config_path: Optional[Path] = None, api_key: Optional[str] = None):
self.config_path = config_path or Path.home() / ".claude" / "claude.json"
self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
# Lazy initialization
self._async_client: Optional[AsyncAnthropic] = None
self._config: Optional[Dict] = None
self._performance_tracker: Optional[PerformanceTracker] = None
self._agent_memory: Optional[AgentMemory] = None
@property
def async_client(self) -> AsyncAnthropic:
"""Lazy-load async client."""
if self._async_client is None:
self._async_client = AsyncAnthropic(api_key=self.api_key)
return self._async_client
async def load_config(self) -> Dict:
"""Load configuration asynchronously."""
if self._config is None:
async with aiofiles.open(self.config_path, 'r') as f:
content = await f.read()
self._config = json.loads(content)
return self._config
async def load_agent_definition(self, agent_name: str) -> str:
"""Load agent definition asynchronously."""
config = await self.load_config()
agent_config = config['agents'].get(agent_name)
if not agent_config:
raise ValueError(f"Agent '{agent_name}' not found")
agent_file = self.config_path.parent / agent_config['path']
async with aiofiles.open(agent_file, 'r') as f:
return await f.read()
async def execute_agent(
self,
agent_name: str,
task: str,
model: Optional[str] = None,
max_tokens: int = 4096,
temperature: float = 0.7
) -> str:
"""Execute agent asynchronously."""
import time
from datetime import datetime
start_time = time.time()
try:
# Load agent definition
agent_definition = await self.load_agent_definition(agent_name)
# Build prompt
prompt = f"{agent_definition}\n\n# Task\n{task}"
# Call API asynchronously
response = await self.async_client.messages.create(
model=model or "claude-3-5-haiku-20241022",
max_tokens=max_tokens,
temperature=temperature,
messages=[{"role": "user", "content": prompt}]
)
# Extract result
result = response.content[0].text
# Track performance
execution_time = (time.time() - start_time) * 1000
self._track_performance(
agent_name=agent_name,
task=task,
success=True,
execution_time_ms=execution_time,
model=response.model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens
)
return result
except Exception as e:
execution_time = (time.time() - start_time) * 1000
self._track_performance(
agent_name=agent_name,
task=task,
success=False,
execution_time_ms=execution_time,
error_type=type(e).__name__
)
raise
async def execute_multiple(
self,
tasks: list[tuple[str, str]] # List of (agent_name, task)
) -> list[str]:
"""Execute multiple agents concurrently."""
results = await asyncio.gather(*[
self.execute_agent(agent_name, task)
for agent_name, task in tasks
])
return results
def _track_performance(self, **kwargs):
"""Track performance metrics (sync operation)."""
if self._performance_tracker is None:
self._performance_tracker = PerformanceTracker()
# Sync operation - acceptable for metrics
self._performance_tracker.track_execution(**kwargs)
Step 3: Add backward compatibility wrapper
Update claude_force/orchestrator.py:
class AgentOrchestrator:
"""Synchronous orchestrator with async support."""
def __init__(self, ...):
# Existing sync initialization
self._async_orchestrator = None
def execute_agent(self, agent_name: str, task: str, **kwargs) -> str:
"""Synchronous execution (backward compatible)."""
# Existing synchronous implementation
pass
async def execute_agent_async(self, agent_name: str, task: str, **kwargs) -> str:
"""Asynchronous execution (new feature)."""
if self._async_orchestrator is None:
from .async_orchestrator import AsyncAgentOrchestrator
self._async_orchestrator = AsyncAgentOrchestrator(
config_path=self.config_path,
api_key=self.api_key
)
return await self._async_orchestrator.execute_agent(agent_name, task, **kwargs)
async def execute_multiple_async(self, tasks: list) -> list:
"""Execute multiple agents concurrently."""
if self._async_orchestrator is None:
from .async_orchestrator import AsyncAgentOrchestrator
self._async_orchestrator = AsyncAgentOrchestrator(
config_path=self.config_path,
api_key=self.api_key
)
return await self._async_orchestrator.execute_multiple(tasks)
Step 4: Update CLI for async support
Add async commands to claude_force/cli.py:
@click.command()
@click.argument('agent_name')
@click.argument('task')
@click.option('--async', 'use_async', is_flag=True, help='Use async execution')
def execute(agent_name: str, task: str, use_async: bool):
"""Execute an agent with a task."""
orchestrator = AgentOrchestrator()
if use_async:
# Run async version
result = asyncio.run(orchestrator.execute_agent_async(agent_name, task))
else:
# Run sync version (backward compatible)
result = orchestrator.execute_agent(agent_name, task)
click.echo(result)
Testing Requirements
Unit Tests:
# tests/test_async_orchestrator.py
import pytest
import asyncio
from claude_force.async_orchestrator import AsyncAgentOrchestrator
@pytest.mark.asyncio
async def test_async_execute_agent():
"""Test async agent execution."""
orchestrator = AsyncAgentOrchestrator()
result = await orchestrator.execute_agent("python-expert", "What are decorators?")
assert result
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_concurrent_execution():
"""Test concurrent agent execution."""
orchestrator = AsyncAgentOrchestrator()
tasks = [
("python-expert", "Explain lists"),
("python-expert", "Explain dicts"),
("python-expert", "Explain sets")
]
import time
start = time.time()
results = await orchestrator.execute_multiple(tasks)
elapsed = time.time() - start
assert len(results) == 3
assert all(isinstance(r, str) for r in results)
# Should be faster than sequential execution
# (concurrent should be ~similar to single execution time)
# Not 3x the time of a single execution
print(f"Concurrent execution: {elapsed:.2f}s")
Integration Tests:
# tests/integration/test_async_workflows.py
@pytest.mark.asyncio
async def test_workflow_async_speedup():
"""Verify async workflow is faster than sync."""
from claude_force.orchestrator import AgentOrchestrator
import time
orchestrator = AgentOrchestrator()
tasks = [
("python-expert", "Task 1"),
("python-expert", "Task 2"),
("python-expert", "Task 3")
]
# Sync execution
sync_start = time.time()
for agent, task in tasks:
orchestrator.execute_agent(agent, task)
sync_time = time.time() - sync_start
# Async execution
async_start = time.time()
await orchestrator.execute_multiple_async(tasks)
async_time = time.time() - async_start
# Async should be significantly faster
speedup = sync_time / async_time
print(f"Speedup: {speedup:.2f}x")
assert speedup > 2.0 # At least 2x faster for 3 tasks
Success Criteria
✅ All existing tests pass (backward compatibility)
✅ Async execution works for single agents
✅ Concurrent execution works for multiple agents
✅ 2-3x speedup for 3 concurrent tasks
✅ Performance metrics still tracked correctly
✅ CLI supports both sync and async modes
Effort Estimate
Development: 8-12 hours
Testing: 4-6 hours
Documentation: 2-3 hours
Total: 14-21 hours
1.2 Response Caching System
Objective
Implement intelligent caching of Claude API responses to reduce costs and latency for repeated queries.
Current State
Every identical prompt triggers a new API call, even if asked recently.
Target State
Intelligent caching with configurable TTL and cache invalidation.
Implementation Steps
Step 1: Create cache module
Create claude_force/response_cache.py:
"""
Response caching system for Claude API calls.
"""
import hashlib
import json
import time
from pathlib import Path
from typing import Optional, Dict, Any
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
@dataclass
class CacheEntry:
"""Cached response entry."""
key: str
agent_name: str
task: str
model: str
response: str
input_tokens: int
output_tokens: int
estimated_cost: float
timestamp: float
hit_count: int = 0
class ResponseCache:
"""
Intelligent response cache for Claude API calls.
Features:
- TTL-based expiration
- LRU eviction
- Size limits
- Cache statistics
- Exclusion lists (non-deterministic agents)
"""
def __init__(
self,
cache_dir: Optional[Path] = None,
ttl_hours: int = 24,
max_size_mb: int = 100,
enabled: bool = True
):
self.cache_dir = cache_dir or Path.home() / ".claude" / "cache"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl_seconds = ttl_hours * 3600
self.max_size_bytes = max_size_mb * 1024 * 1024
self.enabled = enabled
# In-memory cache for fast access
self._memory_cache: Dict[str, CacheEntry] = {}
self._load_cache_index()
# Statistics
self.stats = {
'hits': 0,
'misses': 0,
'evictions': 0,
'size_bytes': 0
}
def _cache_key(self, agent_name: str, task: str, model: str) -> str:
"""Generate cache key."""
content = f"{agent_name}:{task}:{model}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
def get(
self,
agent_name: str,
task: str,
model: str
) -> Optional[Dict[str, Any]]:
"""Retrieve cached response."""
if not self.enabled:
return None
key = self._cache_key(agent_name, task, model)
# Check memory cache
if key in self._memory_cache:
entry = self._memory_cache[key]
# Check TTL
age = time.time() - entry.timestamp
if age > self.ttl_seconds:
# Expired
self._evict(key)
self.stats['misses'] += 1
return None
# Cache hit
entry.hit_count += 1
self.stats['hits'] += 1
return {
'response': entry.response,
'input_tokens': entry.input_tokens,
'output_tokens': entry.output_tokens,
'estimated_cost': entry.estimated_cost,
'cached': True,
'cache_age_seconds': age
}
# Check disk cache
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
# Check TTL
age = time.time() - cache_file.stat().st_mtime
if age > self.ttl_seconds:
cache_file.unlink()
self.stats['misses'] += 1
return None
# Load from disk
with open(cache_file) as f:
entry_dict = json.load(f)
entry = CacheEntry(**entry_dict)
self._memory_cache[key] = entry
entry.hit_count += 1
self.stats['hits'] += 1
return {
'response': entry.response,
'input_tokens': entry.input_tokens,
'output_tokens': entry.output_tokens,
'estimated_cost': entry.estimated_cost,
'cached': True,
'cache_age_seconds': age
}
# Cache miss
self.stats['misses'] += 1
return None
def set(
self,
agent_name: str,
task: str,
model: str,
response: str,
input_tokens: int,
output_tokens: int,
estimated_cost: float
):
"""Cache a response."""
if not self.enabled:
return
key = self._cache_key(agent_name, task, model)
entry = CacheEntry(
key=key,
agent_name=agent_name,
task=task,
model=model,
response=response,
input_tokens=input_tokens,
output_tokens=output_tokens,
estimated_cost=estimated_cost,
timestamp=time.time()
)
# Store in memory
self._memory_cache[key] = entry
# Store on disk
cache_file = self.cache_dir / f"{key}.json"
with open(cache_file, 'w') as f:
json.dump(asdict(entry), f)
# Update size
self.stats['size_bytes'] += cache_file.stat().st_size
# Check size limit and evict if needed
if self.stats['size_bytes'] > self.max_size_bytes:
self._evict_lru()
def _evict(self, key: str):
"""Evict specific cache entry."""
if key in self._memory_cache:
del self._memory_cache[key]
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
size = cache_file.stat().st_size
cache_file.unlink()
self.stats['size_bytes'] -= size
self.stats['evictions'] += 1
def _evict_lru(self):
"""Evict least recently used entries."""
# Sort by timestamp and hit_count
entries = sorted(
self._memory_cache.items(),
key=lambda x: (x[1].hit_count, x[1].timestamp)
)
# Evict oldest 10%
num_to_evict = max(1, len(entries) // 10)
for key, _ in entries[:num_to_evict]:
self._evict(key)
def clear(self):
"""Clear entire cache."""
for cache_file in self.cache_dir.glob("*.json"):
cache_file.unlink()
self._memory_cache.clear()
self.stats['size_bytes'] = 0
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
total_requests = self.stats['hits'] + self.stats['misses']
hit_rate = (self.stats['hits'] / total_requests * 100) if total_requests > 0 else 0
return {
'enabled': self.enabled,
'hits': self.stats['hits'],
'misses': self.stats['misses'],
'hit_rate': f"{hit_rate:.1f}%",
'evictions': self.stats['evictions'],
'size_mb': self.stats['size_bytes'] / (1024 * 1024),
'entries': len(self._memory_cache)
}
def _load_cache_index(self):
"""Load cache index from disk."""
for cache_file in self.cache_dir.glob("*.json"):
try:
with open(cache_file) as f:
entry_dict = json.load(f)
entry = CacheEntry(**entry_dict)
self._memory_cache[entry.key] = entry
self.stats['size_bytes'] += cache_file.stat().st_size
except Exception:
# Corrupted cache file, remove it
cache_file.unlink()
Step 2: Integrate cache with orchestrator
Update claude_force/orchestrator.py:
class AgentOrchestrator:
def __init__(self, ...):
# Existing initialization
self._response_cache = None
@property
def response_cache(self) -> ResponseCache:
"""Lazy-load response cache."""
if self._response_cache is None:
cache_config = self.config.get('cache', {})
self._response_cache = ResponseCache(
enabled=cache_config.get('enabled', True),
ttl_hours=cache_config.get('ttl_hours', 24),
max_size_mb=cache_config.get('max_size_mb', 100)
)
return self._response_cache
def execute_agent(self, agent_name: str, task: str, **kwargs) -> str:
"""Execute agent with caching support."""
model = kwargs.get('model') or self._select_model(agent_name, task)
# Check if agent should be excluded from caching
exclude_list = self.config.get('cache', {}).get('exclude_agents', [])
use_cache = agent_name not in exclude_list
# Try cache first
if use_cache:
cached = self.response_cache.get(agent_name, task, model)
if cached:
# Cache hit!
self._track_cached_execution(
agent_name=agent_name,
task=task,
cached_result=cached
)
return cached['response']
# Cache miss - execute normally
start_time = time.time()
try:
# Existing execution logic
response = self.client.messages.create(...)
result = response.content[0].text
# Cache the response
if use_cache:
self.response_cache.set(
agent_name=agent_name,
task=task,
model=response.model,
response=result,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
estimated_cost=self._estimate_cost(...)
)
# Track performance
self._track_execution(...)
return result
except Exception as e:
# Existing error handling
raise
def _track_cached_execution(self, agent_name: str, task: str, cached_result: dict):
"""Track cache hit in metrics."""
self.performance_tracker.track_execution(
agent_name=agent_name,
task=task,
success=True,
execution_time_ms=1, # Minimal time for cache hit
model=cached_result.get('model', 'cached'),
input_tokens=cached_result['input_tokens'],
output_tokens=cached_result['output_tokens'],
estimated_cost=0.0, # No cost for cache hit
metadata={'cached': True, 'cache_age': cached_result['cache_age_seconds']}
)
Step 3: Add configuration
Update .claude/claude.json:
{
"cache": {
"enabled": true,
"ttl_hours": 24,
"max_size_mb": 100,
"exclude_agents": [
"random-generator",
"creative-writer"
]
}
}
Step 4: Add CLI commands for cache management
Update claude_force/cli.py:
@click.group()
def cache():
"""Manage response cache."""
pass
@cache.command()
def stats():
"""Show cache statistics."""
orchestrator = AgentOrchestrator()
stats = orchestrator.response_cache.get_stats()
click.echo("Cache Statistics")
click.echo("=" * 40)
click.echo(f"Enabled: {stats['enabled']}")
click.echo(f"Hits: {stats['hits']}")
click.echo(f"Misses: {stats['misses']}")
click.echo(f"Hit Rate: {stats['hit_rate']}")
click.echo(f"Evictions: {stats['evictions']}")
click.echo(f"Size: {stats['size_mb']:.2f} MB")
click.echo(f"Entries: {stats['entries']}")
@cache.command()
@click.confirmation_option(prompt='Are you sure you want to clear the cache?')
def clear():
"""Clear response cache."""
orchestrator = AgentOrchestrator()
orchestrator.response_cache.clear()
click.echo("✅ Cache cleared")
cli.add_command(cache)
Testing Requirements
Unit Tests:
# tests/test_response_cache.py
def test_cache_hit():
"""Test cache hit returns cached response."""
cache = ResponseCache()
# First call - cache miss
result1 = cache.get("python-expert", "Test task", "haiku")
assert result1 is None
# Set cache
cache.set(
"python-expert",
"Test task",
"haiku",
"Test response",
100,
50,
0.001
)
# Second call - cache hit
result2 = cache.get("python-expert", "Test task", "haiku")
assert result2 is not None
assert result2['response'] == "Test response"
assert result2['cached'] is True
def test_cache_ttl_expiration():
"""Test cache entries expire after TTL."""
cache = ResponseCache(ttl_hours=0.001) # Very short TTL
cache.set("agent", "task", "model", "response", 100, 50, 0.001)
# Immediate retrieval - should hit
result1 = cache.get("agent", "task", "model")
assert result1 is not None
# Wait for expiration
import time
time.sleep(5) # 5 seconds > 0.001 hours
# Should miss
result2 = cache.get("agent", "task", "model")
assert result2 is None
Integration Tests:
# tests/integration/test_cache_integration.py
def test_orchestrator_caching():
"""Test orchestrator uses cache correctly."""
orchestrator = AgentOrchestrator()
task = "Explain Python decorators"
# First execution - no cache
import time
start1 = time.time()
result1 = orchestrator.execute_agent("python-expert", task)
time1 = time.time() - start1
# Second execution - should use cache
start2 = time.time()
result2 = orchestrator.execute_agent("python-expert", task)
time2 = time.time() - start2
# Results should be identical
assert result1 == result2
# Second should be much faster (<100ms)
assert time2 < 0.1 # Less than 100ms
# Check cache stats
stats = orchestrator.response_cache.get_stats()
assert stats['hits'] >= 1
Success Criteria
✅ Cache correctly stores and retrieves responses
✅ TTL expiration works as expected
✅ Cache hit provides <100ms response time
✅ Excluded agents bypass cache
✅ Cache stats tracked accurately
✅ LRU eviction prevents unlimited growth
Effort Estimate
Development: 10-14 hours
Testing: 5-7 hours
Documentation: 2-3 hours
Total: 17-24 hours
1.3 Configuration & Documentation
Update Configuration Schema
Add to .claude/claude.json:
{
"performance": {
"async_enabled": true,
"max_concurrent_agents": 10
},
"cache": {
"enabled": true,
"ttl_hours": 24,
"max_size_mb": 100,
"exclude_agents": []
}
}
Update Documentation
Update
README.mdwith new async capabilitiesAdd
docs/async-usage-guide.mdAdd
docs/caching-guide.mdUpdate API documentation
Effort Estimate
Documentation: 4-6 hours
Phase 2: Advanced Optimizations
Duration: 3-4 weeks Priority: 🟡 MEDIUM-HIGH Expected Impact: 2-5x throughput improvement
2.1 Parallel Workflow Execution (DAG-based)
Objective
Enable parallel execution of independent workflow steps using Directed Acyclic Graph (DAG) scheduling.
Current State
# Sequential execution
for step in workflow_steps:
result = execute_agent(step)
context.append(result)
Target State
# Parallel execution with dependency management
async def execute_workflow_dag(workflow):
dag = build_dependency_graph(workflow)
while dag:
# Execute all ready steps in parallel
ready = get_steps_with_no_dependencies(dag)
results = await asyncio.gather(*[execute_step(s) for s in ready])
update_dag(dag, results)
Implementation Steps
Step 1: Add dependency tracking to workflows
Update workflow definitions in .claude/claude.json:
{
"workflows": {
"code-quality-check": {
"name": "Code Quality Check",
"description": "Comprehensive code quality analysis",
"steps": [
{
"id": "linter",
"agent": "linter",
"task": "Run Python linter",
"dependencies": []
},
{
"id": "type-checker",
"agent": "type-checker",
"task": "Run type checking",
"dependencies": []
},
{
"id": "security-scan",
"agent": "security-scanner",
"task": "Scan for security issues",
"dependencies": []
},
{
"id": "final-review",
"agent": "code-reviewer",
"task": "Final code review based on findings",
"dependencies": ["linter", "type-checker", "security-scan"]
}
]
}
}
}
Step 2: Create DAG executor
Create claude_force/workflow_dag.py:
"""
DAG-based workflow executor for parallel execution.
"""
import asyncio
from typing import Dict, List, Set, Any
from dataclasses import dataclass
from .async_orchestrator import AsyncAgentOrchestrator
@dataclass
class WorkflowStep:
"""Workflow step definition."""
id: str
agent: str
task: str
dependencies: List[str]
result: Any = None
completed: bool = False
class WorkflowDAG:
"""DAG-based workflow executor."""
def __init__(self, orchestrator: AsyncAgentOrchestrator):
self.orchestrator = orchestrator
def build_dag(self, workflow_config: dict) -> Dict[str, WorkflowStep]:
"""Build DAG from workflow configuration."""
steps = {}
for step_config in workflow_config['steps']:
step = WorkflowStep(
id=step_config['id'],
agent=step_config['agent'],
task=step_config['task'],
dependencies=step_config.get('dependencies', [])
)
steps[step.id] = step
# Validate no cycles
self._validate_acyclic(steps)
return steps
def _validate_acyclic(self, steps: Dict[str, WorkflowStep]):
"""Validate DAG has no cycles."""
def has_cycle(step_id: str, visited: Set[str], rec_stack: Set[str]) -> bool:
visited.add(step_id)
rec_stack.add(step_id)
step = steps.get(step_id)
if step:
for dep in step.dependencies:
if dep not in visited:
if has_cycle(dep, visited, rec_stack):
return True
elif dep in rec_stack:
return True
rec_stack.remove(step_id)
return False
visited = set()
rec_stack = set()
for step_id in steps:
if step_id not in visited:
if has_cycle(step_id, visited, rec_stack):
raise ValueError(f"Workflow contains cycle involving step: {step_id}")
def get_ready_steps(self, steps: Dict[str, WorkflowStep]) -> List[WorkflowStep]:
"""Get steps with all dependencies satisfied."""
ready = []
for step in steps.values():
if step.completed:
continue
# Check if all dependencies are completed
deps_completed = all(
steps[dep_id].completed
for dep_id in step.dependencies
)
if deps_completed:
ready.append(step)
return ready
async def execute_workflow(self, workflow_config: dict) -> Dict[str, Any]:
"""Execute workflow with parallel execution where possible."""
import time
start_time = time.time()
steps = self.build_dag(workflow_config)
results = {}
execution_order = []
print(f"Executing workflow: {workflow_config['name']}")
print(f"Total steps: {len(steps)}")
print()
while any(not step.completed for step in steps.values()):
# Get steps ready to execute
ready = self.get_ready_steps(steps)
if not ready:
# No steps ready but workflow not complete - error
incomplete = [s.id for s in steps.values() if not s.completed]
raise RuntimeError(f"Workflow deadlock. Incomplete steps: {incomplete}")
print(f"Executing {len(ready)} step(s) in parallel: {[s.id for s in ready]}")
# Execute ready steps in parallel
step_start = time.time()
step_results = await asyncio.gather(*[
self.orchestrator.execute_agent(step.agent, step.task)
for step in ready
])
step_time = time.time() - step_start
# Update results
for step, result in zip(ready, step_results):
step.result = result
step.completed = True
results[step.id] = result
execution_order.append(step.id)
print(f" Completed in {step_time:.2f}s")
print()
total_time = time.time() - start_time
return {
'results': results,
'execution_order': execution_order,
'total_time_seconds': total_time,
'workflow_name': workflow_config['name']
}
Step 3: Integrate with orchestrator
Update claude_force/orchestrator.py:
class AgentOrchestrator:
async def execute_workflow_async(self, workflow_name: str) -> Dict[str, Any]:
"""Execute workflow with parallel execution."""
workflow_config = self.config['workflows'].get(workflow_name)
if not workflow_config:
raise ValueError(f"Workflow '{workflow_name}' not found")
from .workflow_dag import WorkflowDAG
from .async_orchestrator import AsyncAgentOrchestrator
async_orchestrator = AsyncAgentOrchestrator(
config_path=self.config_path,
api_key=self.api_key
)
dag = WorkflowDAG(async_orchestrator)
return await dag.execute_workflow(workflow_config)
Step 4: Update CLI
Add workflow execution command:
@click.command()
@click.argument('workflow_name')
@click.option('--parallel/--sequential', default=True, help='Use parallel execution')
def run_workflow(workflow_name: str, parallel: bool):
"""Execute a workflow."""
orchestrator = AgentOrchestrator()
if parallel:
result = asyncio.run(orchestrator.execute_workflow_async(workflow_name))
click.echo(f"✅ Workflow completed in {result['total_time_seconds']:.2f}s")
click.echo(f"Execution order: {' → '.join(result['execution_order'])}")
else:
# Sequential execution (existing)
result = orchestrator.execute_workflow(workflow_name)
click.echo("✅ Workflow completed")
Testing Requirements
# tests/test_workflow_dag.py
@pytest.mark.asyncio
async def test_parallel_workflow_execution():
"""Test parallel workflow is faster than sequential."""
orchestrator = AgentOrchestrator()
# Workflow with 3 independent steps + 1 dependent
workflow = {
'name': 'Test Workflow',
'steps': [
{'id': 'step1', 'agent': 'python-expert', 'task': 'Task 1', 'dependencies': []},
{'id': 'step2', 'agent': 'python-expert', 'task': 'Task 2', 'dependencies': []},
{'id': 'step3', 'agent': 'python-expert', 'task': 'Task 3', 'dependencies': []},
{'id': 'step4', 'agent': 'code-reviewer', 'task': 'Review all', 'dependencies': ['step1', 'step2', 'step3']}
]
}
import time
start = time.time()
result = await orchestrator.execute_workflow_async(workflow)
parallel_time = time.time() - start
# Should complete in ~2x single step time (not 4x)
# Because first 3 steps run in parallel
assert parallel_time < 10 # Assuming single step is ~3s
assert len(result['results']) == 4
Success Criteria
✅ DAG correctly identifies dependencies
✅ Independent steps execute in parallel
✅ Dependent steps wait for prerequisites
✅ 2-3x speedup for workflows with parallelizable steps
✅ No deadlocks or race conditions
Effort Estimate
Development: 12-16 hours
Testing: 6-8 hours
Documentation: 3-4 hours
Total: 21-28 hours
2.2 Metrics Aggregation
Implementation: See Performance Analysis
Effort Estimate
Development: 6-8 hours
Testing: 3-4 hours
Total: 9-12 hours
2.3 Query Result Caching
Implementation: Add LRU cache to AgentMemory queries
Effort Estimate
Development: 4-6 hours
Testing: 2-3 hours
Total: 6-9 hours
Phase 3: Polish & Enhancement
Duration: 2-3 weeks Priority: 🟢 LOW-MEDIUM Expected Impact: Improved reliability and monitoring
3.1 Enhanced Performance Monitoring
Real-time performance dashboard
Advanced analytics
Anomaly detection
Effort Estimate
Development: 8-12 hours
3.2 Circuit Breakers
Fail-fast on API errors
Automatic retry with exponential backoff
Health checks
Effort Estimate
Development: 6-8 hours
3.3 Advanced Caching Strategies
Semantic caching (similar queries)
Partial response caching
Distributed cache support (Redis)
Effort Estimate
Development: 10-14 hours
Testing Strategy
Unit Tests
Coverage Target: >90%
Key Areas:
Async orchestrator
Response cache (hit/miss/eviction)
DAG builder (cycle detection)
Workflow executor
Integration Tests
Test Scenarios:
Full async workflow execution
Cache integration with real API
Parallel execution stress test
Performance regression tests
Performance Tests
Benchmarks to Run:
Async vs sync comparison
Cache hit rate measurement
Parallel speedup verification
Resource usage profiling
Regression Tests
Automated Checks:
Performance baselines
API compatibility
Backward compatibility
Migration & Backward Compatibility
Backward Compatibility Strategy
100% Backward Compatible
All new features are opt-in:
# Old code continues to work unchanged
orchestrator = AgentOrchestrator()
result = orchestrator.execute_agent("python-expert", "task")
# New async API is optional
result = await orchestrator.execute_agent_async("python-expert", "task")
Migration Path
Phase 1: Soft Launch
New features available but not default
Documentation and examples provided
Monitoring of adoption
Phase 2: Encourage Adoption
Performance benefits highlighted
CLI defaults to async (with –sync option)
Cache enabled by default
Phase 3: Full Adoption
Deprecation warnings for sync-only usage
Async becomes default in v3.0
Configuration Migration
Auto-migration on first run:
def migrate_config(config: dict) -> dict:
"""Auto-migrate config to new schema."""
if 'cache' not in config:
config['cache'] = {
'enabled': True,
'ttl_hours': 24,
'max_size_mb': 100
}
if 'performance' not in config:
config['performance'] = {
'async_enabled': True,
'max_concurrent_agents': 10
}
return config
Risk Assessment
High-Risk Areas
1. Async Implementation Complexity
Risk: Async/await introduces complexity, potential for deadlocks
Mitigation:
Comprehensive testing with asyncio test framework
Timeout mechanisms on all async operations
Backward compatible sync API maintained
Code review by async-experienced developers
Contingency: Rollback to sync if critical issues found
2. Cache Correctness
Risk: Stale cache responses, incorrect behavior
Mitigation:
Conservative TTL defaults (24 hours)
Exclude non-deterministic agents by default
Cache invalidation on config changes
Clear cache command readily available
Extensive testing of cache behavior
Contingency: Disable cache by default if issues arise
3. DAG Complexity
Risk: Workflow execution errors, deadlocks
Mitigation:
Cycle detection in DAG validation
Timeout on workflow execution
Detailed logging of execution order
Fallback to sequential execution on errors
Contingency: Sequential execution remains available
Medium-Risk Areas
4. Performance Regression
Risk: Optimization actually slows things down
Mitigation:
Comprehensive benchmarking before/after
Automated performance regression tests
A/B testing in staging environment
Contingency: Feature flags to disable optimizations
5. Increased Memory Usage
Risk: Caching and async may increase memory
Mitigation:
Cache size limits enforced
LRU eviction implemented
Memory profiling during development
Monitoring in production
Contingency: Configurable cache limits, can be disabled
Success Metrics
Performance Metrics
Metric |
Current |
Target |
Measurement |
|---|---|---|---|
Workflow Time (3 agents) |
12-30s |
4-10s |
Benchmarks |
Cache Hit Rate |
0% |
20-70% |
Cache stats |
Cost Per Execution |
Baseline |
-30-50% |
Analytics |
Throughput |
1x |
2-5x |
Load tests |
P95 Latency |
10s |
5s |
Metrics |
Quality Metrics
Metric |
Target |
Measurement |
|---|---|---|
Test Coverage |
>90% |
pytest-cov |
Success Rate |
>95% |
Analytics |
Backward Compatibility |
100% |
Integration tests |
Documentation Coverage |
100% |
Manual review |
Adoption Metrics
Metric |
Target (3 months) |
Measurement |
|---|---|---|
Async API Usage |
>50% |
Telemetry |
Cache Enabled |
>80% |
Config analysis |
Parallel Workflows |
>30% |
Usage stats |
Implementation Timeline
Month 1: Phase 1 - Foundation
Week 1-2: Async Implementation
Create AsyncAgentOrchestrator
Add async methods to AgentOrchestrator
Update CLI with async support
Write unit tests
Write integration tests
Week 3-4: Response Caching
Create ResponseCache module
Integrate with orchestrator
Add cache CLI commands
Test cache correctness
Performance benchmarks
Deliverables:
✅ Async API working
✅ Response caching functional
✅ 90%+ test coverage
✅ Documentation updated
Month 2: Phase 2 - Advanced Optimization
Week 1-2: Parallel Workflows
Create WorkflowDAG module
Update workflow schema
Implement DAG executor
Add workflow CLI commands
Test parallel execution
Week 3: Metrics & Caching
Implement metrics aggregation
Add query result caching
Performance testing
Week 4: Integration & Testing
End-to-end integration tests
Performance regression tests
Load testing
Bug fixes
Deliverables:
✅ Parallel workflows working
✅ 2-5x speedup demonstrated
✅ All tests passing
Month 3: Phase 3 - Polish & Release
Week 1-2: Enhancements
Enhanced monitoring
Circuit breakers
Advanced caching
Week 3: Documentation & Examples
Usage guides
API documentation
Example workflows
Migration guide
Week 4: Release Preparation
Final testing
Performance validation
Release notes
Version 2.3.0 release
Deliverables:
✅ Production-ready release
✅ Complete documentation
✅ Performance targets met
Next Steps
Immediate Actions
Review & Approve Plan - Stakeholder review (1 week)
Set Up Development Environment - Create feature branch
Begin Phase 1 - Start async implementation
Decision Points
Week 2: Review async implementation, decide to proceed with caching
Week 6: Review Phase 1 results, decide to proceed with Phase 2
Week 10: Review overall progress, decide on Phase 3 scope
Success Gates
Each phase requires:
✅ All tests passing
✅ Performance targets met
✅ Documentation complete
✅ Code review approved
Appendix
A. Code Review Checklist
Async operations have timeouts
Error handling for all async code
Cache eviction prevents memory leaks
DAG validation prevents cycles
Backward compatibility maintained
Tests cover edge cases
Documentation updated
Performance benchmarks run
B. Performance Testing Script
#!/bin/bash
# performance_validation.sh
echo "Running performance validation..."
# Baseline (sync)
python benchmark_sync.py > baseline.txt
# With async
python benchmark_async.py > async.txt
# With caching
python benchmark_with_cache.py > cache.txt
# With parallel workflows
python benchmark_parallel_workflows.py > parallel.txt
# Compare results
python compare_results.py baseline.txt async.txt cache.txt parallel.txt
C. Rollback Plan
If critical issues found:
Immediate: Disable feature via config flag
Short-term: Revert to previous version
Long-term: Fix issues and re-release
Feature flags for easy rollback:
{
"features": {
"async_enabled": false,
"cache_enabled": false,
"parallel_workflows_enabled": false
}
}
Document Version: 1.0 Last Updated: 2025-11-14 Next Review: After Phase 1 completion Owner: Performance Engineering Team