Skip to content

Orchestrator

orchestrator

Parallel delegation — Coordinator fans out to DomainAgents concurrently.

Uses ADK ParallelAgent for the fan-out topology while keeping a synchronous convenience wrapper for non-async callers.

Orchestrator

Orchestrator(coordinator: CoordinatorAgent)

Orchestrates parallel fan-out from a CoordinatorAgent to multiple DomainAgents.

Provides build_parallel_agent() to construct an ADK ParallelAgent for use within the ADK runtime, and fan_out() / fan_out_sync() for direct programmatic parallel execution.

Source code in libs/ninja-agents/src/ninja_agents/orchestrator.py
def __init__(self, coordinator: CoordinatorAgent) -> None:
    self.coordinator = coordinator

build_parallel_agent

build_parallel_agent(
    target_domains: list[str] | None = None,
) -> ParallelAgent

Build an ADK ParallelAgent for fan-out execution.

Creates fresh LlmAgent instances (ADK enforces single-parent, so the coordinator's own sub_agents cannot be reused here).

Source code in libs/ninja-agents/src/ninja_agents/orchestrator.py
def build_parallel_agent(self, target_domains: list[str] | None = None) -> ParallelAgent:
    """Build an ADK ParallelAgent for fan-out execution.

    Creates fresh ``LlmAgent`` instances (ADK enforces single-parent,
    so the coordinator's own sub_agents cannot be reused here).
    """
    domains = target_domains or self.coordinator.domain_names
    sub_agents: list[LlmAgent] = []
    for d in domains:
        da = self.coordinator.get_domain_agent(d)
        if da is not None:
            sub_agents.append(
                LlmAgent(
                    name=f"{da.name}_parallel",
                    model=da.agent.model,
                    description=da.agent.description,
                    instruction=da.agent.instruction,
                    tools=[],
                    sub_agents=[],
                )
            )
    return ParallelAgent(name="parallel_fan_out", sub_agents=sub_agents)

fan_out async

fan_out(
    request: str,
    target_domains: list[str] | None = None,
    trace: TraceContext | None = None,
) -> dict[str, Any]

Fan out request to multiple domains concurrently, collect results.

PARAMETER DESCRIPTION
request

The user request to process.

TYPE: str

target_domains

Domains to route to. Defaults to all domains.

TYPE: list[str] | None DEFAULT: None

trace

Optional trace context for observability.

TYPE: TraceContext | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

Dict mapping domain name to its result.

Source code in libs/ninja-agents/src/ninja_agents/orchestrator.py
async def fan_out(
    self,
    request: str,
    target_domains: list[str] | None = None,
    trace: TraceContext | None = None,
) -> dict[str, Any]:
    """Fan out request to multiple domains concurrently, collect results.

    Args:
        request: The user request to process.
        target_domains: Domains to route to.  Defaults to all domains.
        trace: Optional trace context for observability.

    Returns:
        Dict mapping domain name to its result.
    """
    domains = target_domains or self.coordinator.domain_names
    if trace:
        trace.start_span(self.coordinator.name)
    try:
        tasks = [_execute_domain(self.coordinator, d, request, trace) for d in domains]
        results_list = await asyncio.gather(*tasks, return_exceptions=True)
        results: dict[str, Any] = {}
        for item in results_list:
            if isinstance(item, Exception):
                results["_error"] = str(item)
            else:
                domain_name, result = item
                results[domain_name] = result
        return results
    finally:
        if trace:
            trace.finish_span(self.coordinator.name)

fan_out_sync

fan_out_sync(
    request: str,
    target_domains: list[str] | None = None,
    trace: TraceContext | None = None,
) -> dict[str, Any]

Synchronous wrapper around fan_out for non-async contexts.

Source code in libs/ninja-agents/src/ninja_agents/orchestrator.py
def fan_out_sync(
    self,
    request: str,
    target_domains: list[str] | None = None,
    trace: TraceContext | None = None,
) -> dict[str, Any]:
    """Synchronous wrapper around fan_out for non-async contexts."""
    return asyncio.run(self.fan_out(request, target_domains, trace))