Claude Force Performance Optimization Implementation Plan

Version: 1.0 Date: 2025-11-14 Status: Draft - Pending Approval Target Release: v2.3.0


Executive Summary

This document outlines a comprehensive plan to optimize Claude Force performance based on the findings in the Performance Analysis Report. The implementation is divided into three phases over 3 months, targeting:

Primary Goals

Metric

Current

Target

Improvement

Workflow Execution Time

12-30s (3 agents)

4-10s

50-80% faster

Cost Per Execution

Baseline

30-50% less

Cost reduction

Throughput

1 task/time

2-5 tasks/time

2-5x increase

Cache Hit Rate

0%

20-70%

New capability

Implementation Phases

Phase 1 (Month 1): Foundation ✅ High Impact

  • Async API client implementation

  • Response caching system

  • Backward compatibility layer

Phase 2 (Month 2): Advanced Optimization ⭐ Medium Impact

  • Parallel workflow execution (DAG-based)

  • Metrics aggregation

  • Query result caching

Phase 3 (Month 3): Polish & Enhancement 📊 Low Impact

  • Performance monitoring enhancements

  • Circuit breakers

  • Advanced caching strategies

Expected ROI

Investment: ~40-60 development hours
Benefits:
  - 50-80% faster workflow execution → Better UX
  - 30-50% cost reduction → $500-2000/month savings (depending on usage)
  - 2-5x throughput → Support for concurrent users
  - Production-ready scalability → Enterprise readiness

Table of Contents

  1. Phase 1: Foundation Optimizations

  2. Phase 2: Advanced Optimizations

  3. Phase 3: Polish & Enhancement

  4. Testing Strategy

  5. Migration & Backward Compatibility

  6. Risk Assessment

  7. Success Metrics

  8. Implementation Timeline


Phase 1: Foundation Optimizations

Duration: 3-4 weeks Priority: 🔴 CRITICAL Expected Impact: 50-80% performance improvement

1.1 Async API Client Implementation

Objective

Replace synchronous Claude API calls with async implementation to enable non-blocking operations and concurrent execution.

Current State

# orchestrator.py (synchronous)
response = self.client.messages.create(
    model=model,
    max_tokens=max_tokens,
    temperature=temperature,
    messages=[{"role": "user", "content": prompt}]
)

Target State

# orchestrator.py (async)
async def execute_agent_async(self, agent_name: str, task: str):
    response = await self.async_client.messages.create(
        model=model,
        max_tokens=max_tokens,
        temperature=temperature,
        messages=[{"role": "user", "content": prompt}]
    )
    return response

Implementation Steps

Step 1: Install async dependencies

# Update requirements.txt
anthropic>=0.40.0  # Already supports async
aiofiles>=23.0.0   # Async file I/O

Step 2: Create async orchestrator module

Create claude_force/async_orchestrator.py:

"""
Async version of AgentOrchestrator for non-blocking operations.
"""
import asyncio
import aiofiles
from pathlib import Path
from typing import Optional, Dict, Any
from anthropic import AsyncAnthropic
from .performance_tracker import PerformanceTracker
from .agent_memory import AgentMemory

class AsyncAgentOrchestrator:
    """Async orchestrator for non-blocking agent execution."""

    def __init__(self, config_path: Optional[Path] = None, api_key: Optional[str] = None):
        self.config_path = config_path or Path.home() / ".claude" / "claude.json"
        self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY")

        # Lazy initialization
        self._async_client: Optional[AsyncAnthropic] = None
        self._config: Optional[Dict] = None
        self._performance_tracker: Optional[PerformanceTracker] = None
        self._agent_memory: Optional[AgentMemory] = None

    @property
    def async_client(self) -> AsyncAnthropic:
        """Lazy-load async client."""
        if self._async_client is None:
            self._async_client = AsyncAnthropic(api_key=self.api_key)
        return self._async_client

    async def load_config(self) -> Dict:
        """Load configuration asynchronously."""
        if self._config is None:
            async with aiofiles.open(self.config_path, 'r') as f:
                content = await f.read()
                self._config = json.loads(content)
        return self._config

    async def load_agent_definition(self, agent_name: str) -> str:
        """Load agent definition asynchronously."""
        config = await self.load_config()
        agent_config = config['agents'].get(agent_name)

        if not agent_config:
            raise ValueError(f"Agent '{agent_name}' not found")

        agent_file = self.config_path.parent / agent_config['path']

        async with aiofiles.open(agent_file, 'r') as f:
            return await f.read()

    async def execute_agent(
        self,
        agent_name: str,
        task: str,
        model: Optional[str] = None,
        max_tokens: int = 4096,
        temperature: float = 0.7
    ) -> str:
        """Execute agent asynchronously."""
        import time
        from datetime import datetime

        start_time = time.time()

        try:
            # Load agent definition
            agent_definition = await self.load_agent_definition(agent_name)

            # Build prompt
            prompt = f"{agent_definition}\n\n# Task\n{task}"

            # Call API asynchronously
            response = await self.async_client.messages.create(
                model=model or "claude-3-5-haiku-20241022",
                max_tokens=max_tokens,
                temperature=temperature,
                messages=[{"role": "user", "content": prompt}]
            )

            # Extract result
            result = response.content[0].text

            # Track performance
            execution_time = (time.time() - start_time) * 1000
            self._track_performance(
                agent_name=agent_name,
                task=task,
                success=True,
                execution_time_ms=execution_time,
                model=response.model,
                input_tokens=response.usage.input_tokens,
                output_tokens=response.usage.output_tokens
            )

            return result

        except Exception as e:
            execution_time = (time.time() - start_time) * 1000
            self._track_performance(
                agent_name=agent_name,
                task=task,
                success=False,
                execution_time_ms=execution_time,
                error_type=type(e).__name__
            )
            raise

    async def execute_multiple(
        self,
        tasks: list[tuple[str, str]]  # List of (agent_name, task)
    ) -> list[str]:
        """Execute multiple agents concurrently."""
        results = await asyncio.gather(*[
            self.execute_agent(agent_name, task)
            for agent_name, task in tasks
        ])
        return results

    def _track_performance(self, **kwargs):
        """Track performance metrics (sync operation)."""
        if self._performance_tracker is None:
            self._performance_tracker = PerformanceTracker()
        # Sync operation - acceptable for metrics
        self._performance_tracker.track_execution(**kwargs)

Step 3: Add backward compatibility wrapper

Update claude_force/orchestrator.py:

class AgentOrchestrator:
    """Synchronous orchestrator with async support."""

    def __init__(self, ...):
        # Existing sync initialization
        self._async_orchestrator = None

    def execute_agent(self, agent_name: str, task: str, **kwargs) -> str:
        """Synchronous execution (backward compatible)."""
        # Existing synchronous implementation
        pass

    async def execute_agent_async(self, agent_name: str, task: str, **kwargs) -> str:
        """Asynchronous execution (new feature)."""
        if self._async_orchestrator is None:
            from .async_orchestrator import AsyncAgentOrchestrator
            self._async_orchestrator = AsyncAgentOrchestrator(
                config_path=self.config_path,
                api_key=self.api_key
            )
        return await self._async_orchestrator.execute_agent(agent_name, task, **kwargs)

    async def execute_multiple_async(self, tasks: list) -> list:
        """Execute multiple agents concurrently."""
        if self._async_orchestrator is None:
            from .async_orchestrator import AsyncAgentOrchestrator
            self._async_orchestrator = AsyncAgentOrchestrator(
                config_path=self.config_path,
                api_key=self.api_key
            )
        return await self._async_orchestrator.execute_multiple(tasks)

Step 4: Update CLI for async support

Add async commands to claude_force/cli.py:

@click.command()
@click.argument('agent_name')
@click.argument('task')
@click.option('--async', 'use_async', is_flag=True, help='Use async execution')
def execute(agent_name: str, task: str, use_async: bool):
    """Execute an agent with a task."""
    orchestrator = AgentOrchestrator()

    if use_async:
        # Run async version
        result = asyncio.run(orchestrator.execute_agent_async(agent_name, task))
    else:
        # Run sync version (backward compatible)
        result = orchestrator.execute_agent(agent_name, task)

    click.echo(result)

Testing Requirements

Unit Tests:

# tests/test_async_orchestrator.py
import pytest
import asyncio
from claude_force.async_orchestrator import AsyncAgentOrchestrator

@pytest.mark.asyncio
async def test_async_execute_agent():
    """Test async agent execution."""
    orchestrator = AsyncAgentOrchestrator()
    result = await orchestrator.execute_agent("python-expert", "What are decorators?")
    assert result
    assert isinstance(result, str)

@pytest.mark.asyncio
async def test_concurrent_execution():
    """Test concurrent agent execution."""
    orchestrator = AsyncAgentOrchestrator()

    tasks = [
        ("python-expert", "Explain lists"),
        ("python-expert", "Explain dicts"),
        ("python-expert", "Explain sets")
    ]

    import time
    start = time.time()
    results = await orchestrator.execute_multiple(tasks)
    elapsed = time.time() - start

    assert len(results) == 3
    assert all(isinstance(r, str) for r in results)

    # Should be faster than sequential execution
    # (concurrent should be ~similar to single execution time)
    # Not 3x the time of a single execution
    print(f"Concurrent execution: {elapsed:.2f}s")

Integration Tests:

# tests/integration/test_async_workflows.py
@pytest.mark.asyncio
async def test_workflow_async_speedup():
    """Verify async workflow is faster than sync."""
    from claude_force.orchestrator import AgentOrchestrator
    import time

    orchestrator = AgentOrchestrator()

    tasks = [
        ("python-expert", "Task 1"),
        ("python-expert", "Task 2"),
        ("python-expert", "Task 3")
    ]

    # Sync execution
    sync_start = time.time()
    for agent, task in tasks:
        orchestrator.execute_agent(agent, task)
    sync_time = time.time() - sync_start

    # Async execution
    async_start = time.time()
    await orchestrator.execute_multiple_async(tasks)
    async_time = time.time() - async_start

    # Async should be significantly faster
    speedup = sync_time / async_time
    print(f"Speedup: {speedup:.2f}x")
    assert speedup > 2.0  # At least 2x faster for 3 tasks

Success Criteria

  • ✅ All existing tests pass (backward compatibility)

  • ✅ Async execution works for single agents

  • ✅ Concurrent execution works for multiple agents

  • ✅ 2-3x speedup for 3 concurrent tasks

  • ✅ Performance metrics still tracked correctly

  • ✅ CLI supports both sync and async modes

Effort Estimate

  • Development: 8-12 hours

  • Testing: 4-6 hours

  • Documentation: 2-3 hours

  • Total: 14-21 hours


1.2 Response Caching System

Objective

Implement intelligent caching of Claude API responses to reduce costs and latency for repeated queries.

Current State

Every identical prompt triggers a new API call, even if asked recently.

Target State

Intelligent caching with configurable TTL and cache invalidation.

Implementation Steps

Step 1: Create cache module

Create claude_force/response_cache.py:

"""
Response caching system for Claude API calls.
"""
import hashlib
import json
import time
from pathlib import Path
from typing import Optional, Dict, Any
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta

@dataclass
class CacheEntry:
    """Cached response entry."""
    key: str
    agent_name: str
    task: str
    model: str
    response: str
    input_tokens: int
    output_tokens: int
    estimated_cost: float
    timestamp: float
    hit_count: int = 0

class ResponseCache:
    """
    Intelligent response cache for Claude API calls.

    Features:
    - TTL-based expiration
    - LRU eviction
    - Size limits
    - Cache statistics
    - Exclusion lists (non-deterministic agents)
    """

    def __init__(
        self,
        cache_dir: Optional[Path] = None,
        ttl_hours: int = 24,
        max_size_mb: int = 100,
        enabled: bool = True
    ):
        self.cache_dir = cache_dir or Path.home() / ".claude" / "cache"
        self.cache_dir.mkdir(parents=True, exist_ok=True)

        self.ttl_seconds = ttl_hours * 3600
        self.max_size_bytes = max_size_mb * 1024 * 1024
        self.enabled = enabled

        # In-memory cache for fast access
        self._memory_cache: Dict[str, CacheEntry] = {}
        self._load_cache_index()

        # Statistics
        self.stats = {
            'hits': 0,
            'misses': 0,
            'evictions': 0,
            'size_bytes': 0
        }

    def _cache_key(self, agent_name: str, task: str, model: str) -> str:
        """Generate cache key."""
        content = f"{agent_name}:{task}:{model}"
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def get(
        self,
        agent_name: str,
        task: str,
        model: str
    ) -> Optional[Dict[str, Any]]:
        """Retrieve cached response."""
        if not self.enabled:
            return None

        key = self._cache_key(agent_name, task, model)

        # Check memory cache
        if key in self._memory_cache:
            entry = self._memory_cache[key]

            # Check TTL
            age = time.time() - entry.timestamp
            if age > self.ttl_seconds:
                # Expired
                self._evict(key)
                self.stats['misses'] += 1
                return None

            # Cache hit
            entry.hit_count += 1
            self.stats['hits'] += 1

            return {
                'response': entry.response,
                'input_tokens': entry.input_tokens,
                'output_tokens': entry.output_tokens,
                'estimated_cost': entry.estimated_cost,
                'cached': True,
                'cache_age_seconds': age
            }

        # Check disk cache
        cache_file = self.cache_dir / f"{key}.json"
        if cache_file.exists():
            # Check TTL
            age = time.time() - cache_file.stat().st_mtime
            if age > self.ttl_seconds:
                cache_file.unlink()
                self.stats['misses'] += 1
                return None

            # Load from disk
            with open(cache_file) as f:
                entry_dict = json.load(f)
                entry = CacheEntry(**entry_dict)
                self._memory_cache[key] = entry

            entry.hit_count += 1
            self.stats['hits'] += 1

            return {
                'response': entry.response,
                'input_tokens': entry.input_tokens,
                'output_tokens': entry.output_tokens,
                'estimated_cost': entry.estimated_cost,
                'cached': True,
                'cache_age_seconds': age
            }

        # Cache miss
        self.stats['misses'] += 1
        return None

    def set(
        self,
        agent_name: str,
        task: str,
        model: str,
        response: str,
        input_tokens: int,
        output_tokens: int,
        estimated_cost: float
    ):
        """Cache a response."""
        if not self.enabled:
            return

        key = self._cache_key(agent_name, task, model)

        entry = CacheEntry(
            key=key,
            agent_name=agent_name,
            task=task,
            model=model,
            response=response,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            estimated_cost=estimated_cost,
            timestamp=time.time()
        )

        # Store in memory
        self._memory_cache[key] = entry

        # Store on disk
        cache_file = self.cache_dir / f"{key}.json"
        with open(cache_file, 'w') as f:
            json.dump(asdict(entry), f)

        # Update size
        self.stats['size_bytes'] += cache_file.stat().st_size

        # Check size limit and evict if needed
        if self.stats['size_bytes'] > self.max_size_bytes:
            self._evict_lru()

    def _evict(self, key: str):
        """Evict specific cache entry."""
        if key in self._memory_cache:
            del self._memory_cache[key]

        cache_file = self.cache_dir / f"{key}.json"
        if cache_file.exists():
            size = cache_file.stat().st_size
            cache_file.unlink()
            self.stats['size_bytes'] -= size
            self.stats['evictions'] += 1

    def _evict_lru(self):
        """Evict least recently used entries."""
        # Sort by timestamp and hit_count
        entries = sorted(
            self._memory_cache.items(),
            key=lambda x: (x[1].hit_count, x[1].timestamp)
        )

        # Evict oldest 10%
        num_to_evict = max(1, len(entries) // 10)
        for key, _ in entries[:num_to_evict]:
            self._evict(key)

    def clear(self):
        """Clear entire cache."""
        for cache_file in self.cache_dir.glob("*.json"):
            cache_file.unlink()

        self._memory_cache.clear()
        self.stats['size_bytes'] = 0

    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics."""
        total_requests = self.stats['hits'] + self.stats['misses']
        hit_rate = (self.stats['hits'] / total_requests * 100) if total_requests > 0 else 0

        return {
            'enabled': self.enabled,
            'hits': self.stats['hits'],
            'misses': self.stats['misses'],
            'hit_rate': f"{hit_rate:.1f}%",
            'evictions': self.stats['evictions'],
            'size_mb': self.stats['size_bytes'] / (1024 * 1024),
            'entries': len(self._memory_cache)
        }

    def _load_cache_index(self):
        """Load cache index from disk."""
        for cache_file in self.cache_dir.glob("*.json"):
            try:
                with open(cache_file) as f:
                    entry_dict = json.load(f)
                    entry = CacheEntry(**entry_dict)
                    self._memory_cache[entry.key] = entry
                    self.stats['size_bytes'] += cache_file.stat().st_size
            except Exception:
                # Corrupted cache file, remove it
                cache_file.unlink()

Step 2: Integrate cache with orchestrator

Update claude_force/orchestrator.py:

class AgentOrchestrator:
    def __init__(self, ...):
        # Existing initialization
        self._response_cache = None

    @property
    def response_cache(self) -> ResponseCache:
        """Lazy-load response cache."""
        if self._response_cache is None:
            cache_config = self.config.get('cache', {})
            self._response_cache = ResponseCache(
                enabled=cache_config.get('enabled', True),
                ttl_hours=cache_config.get('ttl_hours', 24),
                max_size_mb=cache_config.get('max_size_mb', 100)
            )
        return self._response_cache

    def execute_agent(self, agent_name: str, task: str, **kwargs) -> str:
        """Execute agent with caching support."""
        model = kwargs.get('model') or self._select_model(agent_name, task)

        # Check if agent should be excluded from caching
        exclude_list = self.config.get('cache', {}).get('exclude_agents', [])
        use_cache = agent_name not in exclude_list

        # Try cache first
        if use_cache:
            cached = self.response_cache.get(agent_name, task, model)
            if cached:
                # Cache hit!
                self._track_cached_execution(
                    agent_name=agent_name,
                    task=task,
                    cached_result=cached
                )
                return cached['response']

        # Cache miss - execute normally
        start_time = time.time()
        try:
            # Existing execution logic
            response = self.client.messages.create(...)
            result = response.content[0].text

            # Cache the response
            if use_cache:
                self.response_cache.set(
                    agent_name=agent_name,
                    task=task,
                    model=response.model,
                    response=result,
                    input_tokens=response.usage.input_tokens,
                    output_tokens=response.usage.output_tokens,
                    estimated_cost=self._estimate_cost(...)
                )

            # Track performance
            self._track_execution(...)

            return result

        except Exception as e:
            # Existing error handling
            raise

    def _track_cached_execution(self, agent_name: str, task: str, cached_result: dict):
        """Track cache hit in metrics."""
        self.performance_tracker.track_execution(
            agent_name=agent_name,
            task=task,
            success=True,
            execution_time_ms=1,  # Minimal time for cache hit
            model=cached_result.get('model', 'cached'),
            input_tokens=cached_result['input_tokens'],
            output_tokens=cached_result['output_tokens'],
            estimated_cost=0.0,  # No cost for cache hit
            metadata={'cached': True, 'cache_age': cached_result['cache_age_seconds']}
        )

Step 3: Add configuration

Update .claude/claude.json:

{
  "cache": {
    "enabled": true,
    "ttl_hours": 24,
    "max_size_mb": 100,
    "exclude_agents": [
      "random-generator",
      "creative-writer"
    ]
  }
}

Step 4: Add CLI commands for cache management

Update claude_force/cli.py:

@click.group()
def cache():
    """Manage response cache."""
    pass

@cache.command()
def stats():
    """Show cache statistics."""
    orchestrator = AgentOrchestrator()
    stats = orchestrator.response_cache.get_stats()

    click.echo("Cache Statistics")
    click.echo("=" * 40)
    click.echo(f"Enabled: {stats['enabled']}")
    click.echo(f"Hits: {stats['hits']}")
    click.echo(f"Misses: {stats['misses']}")
    click.echo(f"Hit Rate: {stats['hit_rate']}")
    click.echo(f"Evictions: {stats['evictions']}")
    click.echo(f"Size: {stats['size_mb']:.2f} MB")
    click.echo(f"Entries: {stats['entries']}")

@cache.command()
@click.confirmation_option(prompt='Are you sure you want to clear the cache?')
def clear():
    """Clear response cache."""
    orchestrator = AgentOrchestrator()
    orchestrator.response_cache.clear()
    click.echo("✅ Cache cleared")

cli.add_command(cache)

Testing Requirements

Unit Tests:

# tests/test_response_cache.py
def test_cache_hit():
    """Test cache hit returns cached response."""
    cache = ResponseCache()

    # First call - cache miss
    result1 = cache.get("python-expert", "Test task", "haiku")
    assert result1 is None

    # Set cache
    cache.set(
        "python-expert",
        "Test task",
        "haiku",
        "Test response",
        100,
        50,
        0.001
    )

    # Second call - cache hit
    result2 = cache.get("python-expert", "Test task", "haiku")
    assert result2 is not None
    assert result2['response'] == "Test response"
    assert result2['cached'] is True

def test_cache_ttl_expiration():
    """Test cache entries expire after TTL."""
    cache = ResponseCache(ttl_hours=0.001)  # Very short TTL

    cache.set("agent", "task", "model", "response", 100, 50, 0.001)

    # Immediate retrieval - should hit
    result1 = cache.get("agent", "task", "model")
    assert result1 is not None

    # Wait for expiration
    import time
    time.sleep(5)  # 5 seconds > 0.001 hours

    # Should miss
    result2 = cache.get("agent", "task", "model")
    assert result2 is None

Integration Tests:

# tests/integration/test_cache_integration.py
def test_orchestrator_caching():
    """Test orchestrator uses cache correctly."""
    orchestrator = AgentOrchestrator()

    task = "Explain Python decorators"

    # First execution - no cache
    import time
    start1 = time.time()
    result1 = orchestrator.execute_agent("python-expert", task)
    time1 = time.time() - start1

    # Second execution - should use cache
    start2 = time.time()
    result2 = orchestrator.execute_agent("python-expert", task)
    time2 = time.time() - start2

    # Results should be identical
    assert result1 == result2

    # Second should be much faster (<100ms)
    assert time2 < 0.1  # Less than 100ms

    # Check cache stats
    stats = orchestrator.response_cache.get_stats()
    assert stats['hits'] >= 1

Success Criteria

  • ✅ Cache correctly stores and retrieves responses

  • ✅ TTL expiration works as expected

  • ✅ Cache hit provides <100ms response time

  • ✅ Excluded agents bypass cache

  • ✅ Cache stats tracked accurately

  • ✅ LRU eviction prevents unlimited growth

Effort Estimate

  • Development: 10-14 hours

  • Testing: 5-7 hours

  • Documentation: 2-3 hours

  • Total: 17-24 hours


1.3 Configuration & Documentation

Update Configuration Schema

Add to .claude/claude.json:

{
  "performance": {
    "async_enabled": true,
    "max_concurrent_agents": 10
  },
  "cache": {
    "enabled": true,
    "ttl_hours": 24,
    "max_size_mb": 100,
    "exclude_agents": []
  }
}

Update Documentation

  1. Update README.md with new async capabilities

  2. Add docs/async-usage-guide.md

  3. Add docs/caching-guide.md

  4. Update API documentation

Effort Estimate

  • Documentation: 4-6 hours


Phase 2: Advanced Optimizations

Duration: 3-4 weeks Priority: 🟡 MEDIUM-HIGH Expected Impact: 2-5x throughput improvement

2.1 Parallel Workflow Execution (DAG-based)

Objective

Enable parallel execution of independent workflow steps using Directed Acyclic Graph (DAG) scheduling.

Current State

# Sequential execution
for step in workflow_steps:
    result = execute_agent(step)
    context.append(result)

Target State

# Parallel execution with dependency management
async def execute_workflow_dag(workflow):
    dag = build_dependency_graph(workflow)
    while dag:
        # Execute all ready steps in parallel
        ready = get_steps_with_no_dependencies(dag)
        results = await asyncio.gather(*[execute_step(s) for s in ready])
        update_dag(dag, results)

Implementation Steps

Step 1: Add dependency tracking to workflows

Update workflow definitions in .claude/claude.json:

{
  "workflows": {
    "code-quality-check": {
      "name": "Code Quality Check",
      "description": "Comprehensive code quality analysis",
      "steps": [
        {
          "id": "linter",
          "agent": "linter",
          "task": "Run Python linter",
          "dependencies": []
        },
        {
          "id": "type-checker",
          "agent": "type-checker",
          "task": "Run type checking",
          "dependencies": []
        },
        {
          "id": "security-scan",
          "agent": "security-scanner",
          "task": "Scan for security issues",
          "dependencies": []
        },
        {
          "id": "final-review",
          "agent": "code-reviewer",
          "task": "Final code review based on findings",
          "dependencies": ["linter", "type-checker", "security-scan"]
        }
      ]
    }
  }
}

Step 2: Create DAG executor

Create claude_force/workflow_dag.py:

"""
DAG-based workflow executor for parallel execution.
"""
import asyncio
from typing import Dict, List, Set, Any
from dataclasses import dataclass
from .async_orchestrator import AsyncAgentOrchestrator

@dataclass
class WorkflowStep:
    """Workflow step definition."""
    id: str
    agent: str
    task: str
    dependencies: List[str]
    result: Any = None
    completed: bool = False

class WorkflowDAG:
    """DAG-based workflow executor."""

    def __init__(self, orchestrator: AsyncAgentOrchestrator):
        self.orchestrator = orchestrator

    def build_dag(self, workflow_config: dict) -> Dict[str, WorkflowStep]:
        """Build DAG from workflow configuration."""
        steps = {}

        for step_config in workflow_config['steps']:
            step = WorkflowStep(
                id=step_config['id'],
                agent=step_config['agent'],
                task=step_config['task'],
                dependencies=step_config.get('dependencies', [])
            )
            steps[step.id] = step

        # Validate no cycles
        self._validate_acyclic(steps)

        return steps

    def _validate_acyclic(self, steps: Dict[str, WorkflowStep]):
        """Validate DAG has no cycles."""
        def has_cycle(step_id: str, visited: Set[str], rec_stack: Set[str]) -> bool:
            visited.add(step_id)
            rec_stack.add(step_id)

            step = steps.get(step_id)
            if step:
                for dep in step.dependencies:
                    if dep not in visited:
                        if has_cycle(dep, visited, rec_stack):
                            return True
                    elif dep in rec_stack:
                        return True

            rec_stack.remove(step_id)
            return False

        visited = set()
        rec_stack = set()

        for step_id in steps:
            if step_id not in visited:
                if has_cycle(step_id, visited, rec_stack):
                    raise ValueError(f"Workflow contains cycle involving step: {step_id}")

    def get_ready_steps(self, steps: Dict[str, WorkflowStep]) -> List[WorkflowStep]:
        """Get steps with all dependencies satisfied."""
        ready = []

        for step in steps.values():
            if step.completed:
                continue

            # Check if all dependencies are completed
            deps_completed = all(
                steps[dep_id].completed
                for dep_id in step.dependencies
            )

            if deps_completed:
                ready.append(step)

        return ready

    async def execute_workflow(self, workflow_config: dict) -> Dict[str, Any]:
        """Execute workflow with parallel execution where possible."""
        import time

        start_time = time.time()
        steps = self.build_dag(workflow_config)

        results = {}
        execution_order = []

        print(f"Executing workflow: {workflow_config['name']}")
        print(f"Total steps: {len(steps)}")
        print()

        while any(not step.completed for step in steps.values()):
            # Get steps ready to execute
            ready = self.get_ready_steps(steps)

            if not ready:
                # No steps ready but workflow not complete - error
                incomplete = [s.id for s in steps.values() if not s.completed]
                raise RuntimeError(f"Workflow deadlock. Incomplete steps: {incomplete}")

            print(f"Executing {len(ready)} step(s) in parallel: {[s.id for s in ready]}")

            # Execute ready steps in parallel
            step_start = time.time()
            step_results = await asyncio.gather(*[
                self.orchestrator.execute_agent(step.agent, step.task)
                for step in ready
            ])
            step_time = time.time() - step_start

            # Update results
            for step, result in zip(ready, step_results):
                step.result = result
                step.completed = True
                results[step.id] = result
                execution_order.append(step.id)

            print(f"  Completed in {step_time:.2f}s")
            print()

        total_time = time.time() - start_time

        return {
            'results': results,
            'execution_order': execution_order,
            'total_time_seconds': total_time,
            'workflow_name': workflow_config['name']
        }

Step 3: Integrate with orchestrator

Update claude_force/orchestrator.py:

class AgentOrchestrator:
    async def execute_workflow_async(self, workflow_name: str) -> Dict[str, Any]:
        """Execute workflow with parallel execution."""
        workflow_config = self.config['workflows'].get(workflow_name)

        if not workflow_config:
            raise ValueError(f"Workflow '{workflow_name}' not found")

        from .workflow_dag import WorkflowDAG
        from .async_orchestrator import AsyncAgentOrchestrator

        async_orchestrator = AsyncAgentOrchestrator(
            config_path=self.config_path,
            api_key=self.api_key
        )

        dag = WorkflowDAG(async_orchestrator)
        return await dag.execute_workflow(workflow_config)

Step 4: Update CLI

Add workflow execution command:

@click.command()
@click.argument('workflow_name')
@click.option('--parallel/--sequential', default=True, help='Use parallel execution')
def run_workflow(workflow_name: str, parallel: bool):
    """Execute a workflow."""
    orchestrator = AgentOrchestrator()

    if parallel:
        result = asyncio.run(orchestrator.execute_workflow_async(workflow_name))
        click.echo(f"✅ Workflow completed in {result['total_time_seconds']:.2f}s")
        click.echo(f"Execution order: {' → '.join(result['execution_order'])}")
    else:
        # Sequential execution (existing)
        result = orchestrator.execute_workflow(workflow_name)
        click.echo("✅ Workflow completed")

Testing Requirements

# tests/test_workflow_dag.py
@pytest.mark.asyncio
async def test_parallel_workflow_execution():
    """Test parallel workflow is faster than sequential."""
    orchestrator = AgentOrchestrator()

    # Workflow with 3 independent steps + 1 dependent
    workflow = {
        'name': 'Test Workflow',
        'steps': [
            {'id': 'step1', 'agent': 'python-expert', 'task': 'Task 1', 'dependencies': []},
            {'id': 'step2', 'agent': 'python-expert', 'task': 'Task 2', 'dependencies': []},
            {'id': 'step3', 'agent': 'python-expert', 'task': 'Task 3', 'dependencies': []},
            {'id': 'step4', 'agent': 'code-reviewer', 'task': 'Review all', 'dependencies': ['step1', 'step2', 'step3']}
        ]
    }

    import time
    start = time.time()
    result = await orchestrator.execute_workflow_async(workflow)
    parallel_time = time.time() - start

    # Should complete in ~2x single step time (not 4x)
    # Because first 3 steps run in parallel
    assert parallel_time < 10  # Assuming single step is ~3s
    assert len(result['results']) == 4

Success Criteria

  • ✅ DAG correctly identifies dependencies

  • ✅ Independent steps execute in parallel

  • ✅ Dependent steps wait for prerequisites

  • ✅ 2-3x speedup for workflows with parallelizable steps

  • ✅ No deadlocks or race conditions

Effort Estimate

  • Development: 12-16 hours

  • Testing: 6-8 hours

  • Documentation: 3-4 hours

  • Total: 21-28 hours


2.2 Metrics Aggregation

Implementation: See Performance Analysis

Effort Estimate

  • Development: 6-8 hours

  • Testing: 3-4 hours

  • Total: 9-12 hours


2.3 Query Result Caching

Implementation: Add LRU cache to AgentMemory queries

Effort Estimate

  • Development: 4-6 hours

  • Testing: 2-3 hours

  • Total: 6-9 hours


Phase 3: Polish & Enhancement

Duration: 2-3 weeks Priority: 🟢 LOW-MEDIUM Expected Impact: Improved reliability and monitoring

3.1 Enhanced Performance Monitoring

  • Real-time performance dashboard

  • Advanced analytics

  • Anomaly detection

Effort Estimate

  • Development: 8-12 hours

3.2 Circuit Breakers

  • Fail-fast on API errors

  • Automatic retry with exponential backoff

  • Health checks

Effort Estimate

  • Development: 6-8 hours

3.3 Advanced Caching Strategies

  • Semantic caching (similar queries)

  • Partial response caching

  • Distributed cache support (Redis)

Effort Estimate

  • Development: 10-14 hours


Testing Strategy

Unit Tests

Coverage Target: >90%

Key Areas:

  • Async orchestrator

  • Response cache (hit/miss/eviction)

  • DAG builder (cycle detection)

  • Workflow executor

Integration Tests

Test Scenarios:

  1. Full async workflow execution

  2. Cache integration with real API

  3. Parallel execution stress test

  4. Performance regression tests

Performance Tests

Benchmarks to Run:

  1. Async vs sync comparison

  2. Cache hit rate measurement

  3. Parallel speedup verification

  4. Resource usage profiling

Regression Tests

Automated Checks:

  • Performance baselines

  • API compatibility

  • Backward compatibility


Migration & Backward Compatibility

Backward Compatibility Strategy

100% Backward Compatible

All new features are opt-in:

# Old code continues to work unchanged
orchestrator = AgentOrchestrator()
result = orchestrator.execute_agent("python-expert", "task")

# New async API is optional
result = await orchestrator.execute_agent_async("python-expert", "task")

Migration Path

Phase 1: Soft Launch

  • New features available but not default

  • Documentation and examples provided

  • Monitoring of adoption

Phase 2: Encourage Adoption

  • Performance benefits highlighted

  • CLI defaults to async (with –sync option)

  • Cache enabled by default

Phase 3: Full Adoption

  • Deprecation warnings for sync-only usage

  • Async becomes default in v3.0

Configuration Migration

Auto-migration on first run:

def migrate_config(config: dict) -> dict:
    """Auto-migrate config to new schema."""
    if 'cache' not in config:
        config['cache'] = {
            'enabled': True,
            'ttl_hours': 24,
            'max_size_mb': 100
        }

    if 'performance' not in config:
        config['performance'] = {
            'async_enabled': True,
            'max_concurrent_agents': 10
        }

    return config

Risk Assessment

High-Risk Areas

1. Async Implementation Complexity

Risk: Async/await introduces complexity, potential for deadlocks

Mitigation:

  • Comprehensive testing with asyncio test framework

  • Timeout mechanisms on all async operations

  • Backward compatible sync API maintained

  • Code review by async-experienced developers

Contingency: Rollback to sync if critical issues found

2. Cache Correctness

Risk: Stale cache responses, incorrect behavior

Mitigation:

  • Conservative TTL defaults (24 hours)

  • Exclude non-deterministic agents by default

  • Cache invalidation on config changes

  • Clear cache command readily available

  • Extensive testing of cache behavior

Contingency: Disable cache by default if issues arise

3. DAG Complexity

Risk: Workflow execution errors, deadlocks

Mitigation:

  • Cycle detection in DAG validation

  • Timeout on workflow execution

  • Detailed logging of execution order

  • Fallback to sequential execution on errors

Contingency: Sequential execution remains available

Medium-Risk Areas

4. Performance Regression

Risk: Optimization actually slows things down

Mitigation:

  • Comprehensive benchmarking before/after

  • Automated performance regression tests

  • A/B testing in staging environment

Contingency: Feature flags to disable optimizations

5. Increased Memory Usage

Risk: Caching and async may increase memory

Mitigation:

  • Cache size limits enforced

  • LRU eviction implemented

  • Memory profiling during development

  • Monitoring in production

Contingency: Configurable cache limits, can be disabled


Success Metrics

Performance Metrics

Metric

Current

Target

Measurement

Workflow Time (3 agents)

12-30s

4-10s

Benchmarks

Cache Hit Rate

0%

20-70%

Cache stats

Cost Per Execution

Baseline

-30-50%

Analytics

Throughput

1x

2-5x

Load tests

P95 Latency

10s

5s

Metrics

Quality Metrics

Metric

Target

Measurement

Test Coverage

>90%

pytest-cov

Success Rate

>95%

Analytics

Backward Compatibility

100%

Integration tests

Documentation Coverage

100%

Manual review

Adoption Metrics

Metric

Target (3 months)

Measurement

Async API Usage

>50%

Telemetry

Cache Enabled

>80%

Config analysis

Parallel Workflows

>30%

Usage stats


Implementation Timeline

Month 1: Phase 1 - Foundation

Week 1-2: Async Implementation

  • Create AsyncAgentOrchestrator

  • Add async methods to AgentOrchestrator

  • Update CLI with async support

  • Write unit tests

  • Write integration tests

Week 3-4: Response Caching

  • Create ResponseCache module

  • Integrate with orchestrator

  • Add cache CLI commands

  • Test cache correctness

  • Performance benchmarks

Deliverables:

  • ✅ Async API working

  • ✅ Response caching functional

  • ✅ 90%+ test coverage

  • ✅ Documentation updated

Month 2: Phase 2 - Advanced Optimization

Week 1-2: Parallel Workflows

  • Create WorkflowDAG module

  • Update workflow schema

  • Implement DAG executor

  • Add workflow CLI commands

  • Test parallel execution

Week 3: Metrics & Caching

  • Implement metrics aggregation

  • Add query result caching

  • Performance testing

Week 4: Integration & Testing

  • End-to-end integration tests

  • Performance regression tests

  • Load testing

  • Bug fixes

Deliverables:

  • ✅ Parallel workflows working

  • ✅ 2-5x speedup demonstrated

  • ✅ All tests passing

Month 3: Phase 3 - Polish & Release

Week 1-2: Enhancements

  • Enhanced monitoring

  • Circuit breakers

  • Advanced caching

Week 3: Documentation & Examples

  • Usage guides

  • API documentation

  • Example workflows

  • Migration guide

Week 4: Release Preparation

  • Final testing

  • Performance validation

  • Release notes

  • Version 2.3.0 release

Deliverables:

  • ✅ Production-ready release

  • ✅ Complete documentation

  • ✅ Performance targets met


Next Steps

Immediate Actions

  1. Review & Approve Plan - Stakeholder review (1 week)

  2. Set Up Development Environment - Create feature branch

  3. Begin Phase 1 - Start async implementation

Decision Points

Week 2: Review async implementation, decide to proceed with caching

Week 6: Review Phase 1 results, decide to proceed with Phase 2

Week 10: Review overall progress, decide on Phase 3 scope

Success Gates

Each phase requires:

  • ✅ All tests passing

  • ✅ Performance targets met

  • ✅ Documentation complete

  • ✅ Code review approved


Appendix

A. Code Review Checklist

  • Async operations have timeouts

  • Error handling for all async code

  • Cache eviction prevents memory leaks

  • DAG validation prevents cycles

  • Backward compatibility maintained

  • Tests cover edge cases

  • Documentation updated

  • Performance benchmarks run

B. Performance Testing Script

#!/bin/bash
# performance_validation.sh

echo "Running performance validation..."

# Baseline (sync)
python benchmark_sync.py > baseline.txt

# With async
python benchmark_async.py > async.txt

# With caching
python benchmark_with_cache.py > cache.txt

# With parallel workflows
python benchmark_parallel_workflows.py > parallel.txt

# Compare results
python compare_results.py baseline.txt async.txt cache.txt parallel.txt

C. Rollback Plan

If critical issues found:

  1. Immediate: Disable feature via config flag

  2. Short-term: Revert to previous version

  3. Long-term: Fix issues and re-release

Feature flags for easy rollback:

{
  "features": {
    "async_enabled": false,
    "cache_enabled": false,
    "parallel_workflows_enabled": false
  }
}

Document Version: 1.0 Last Updated: 2025-11-14 Next Review: After Phase 1 completion Owner: Performance Engineering Team