Skip to content

TreeExecutor API Reference

Complete API documentation for the TreeExecutor class.

Class Definition

class TreeExecutor(Generic[N]):
    """
    Orchestrates execution of async node trees.

    Manages worker pool, dependency resolution, and result streaming.

    Type Parameters:
        N: The type of nodes in the tree
    """

Constructor

TreeExecutor(
    *,
    uuid: str,
    description: Optional[str] = None,
    roots: Optional[list[Node]] = None
)

Parameters

Parameter Type Default Description
uuid str Required Unique identifier for the executor
description Optional[str] None Optional description of the tree
roots Optional[list[Node]] None List of root nodes to start execution from

Example

from grafo import TreeExecutor, Node

root_a = Node(coroutine=task_a, uuid="root_a")
root_b = Node(coroutine=task_b, uuid="root_b")

executor = TreeExecutor(
    uuid="My Tree",
    description="Multi-root data processing tree",
    roots=[root_a, root_b]
)

Properties

name

@property
def name(self) -> str:
    """The UUID of the executor."""

Example:

print(f"Executing tree: {executor.name}")

description

@property
def description(self) -> Optional[str]:
    """Optional description of the executor."""

roots

@property
def roots(self) -> list[Node]:
    """List of root nodes."""

Example:

print(f"Tree has {len(executor.roots)} root nodes")
for root in executor.roots:
    print(f"  - {root.uuid}")

errors

@property
def errors(self) -> list[Exception]:
    """List of errors encountered during execution."""

Example:

await executor.run()

if executor.errors:
    print("Errors occurred:")
    for error in executor.errors:
        print(f"  - {error}")

Methods

run

async def run(self) -> tuple[list[Node], list[Chunk]]:
    """
    Execute the entire tree and return all results.

    Waits for all nodes to complete before returning.

    Returns:
        Tuple of (completed_nodes, all_chunks)
        - completed_nodes: List of all executed nodes
        - all_chunks: List of all chunks from yielding nodes

    Raises:
        Exceptions from node execution propagate normally
    """

Example:

executor = TreeExecutor(uuid="Tree", roots=[root])

# Execute and get results
nodes, chunks = await executor.run()

print(f"Executed {len(nodes)} nodes")
print(f"Collected {len(chunks)} chunks")

# Access individual node outputs
for node in nodes:
    print(f"{node.uuid}: {node.output}")

yielding

async def yielding(
    self,
    latency: float = 0.2
) -> AsyncGenerator[Union[Node, Chunk], None]:
    """
    Execute the tree and stream results as they complete.

    Args:
        latency: Check interval in seconds (default: 0.2)

    Yields:
        Node objects when nodes complete, or Chunk objects for intermediate results

    Raises:
        Exceptions from node execution propagate normally
    """

Example:

executor = TreeExecutor(uuid="Streaming Tree", roots=[root])

async for item in executor.yielding(latency=0.1):
    if isinstance(item, Chunk):
        print(f"[{item.uuid}] Progress: {item.output}")
    elif isinstance(item, Node):
        print(f"Node {item.uuid} completed: {item.output}")

Latency Parameter:

  • Lower latency = more responsive but higher CPU usage
  • Higher latency = less CPU usage but slower response
  • Default (0.2s) is a good balance for most use cases

get_leaves

def get_leaves(self) -> list[Node]:
    """
    Get all leaf nodes (nodes with no children).

    Returns:
        List of leaf nodes in the tree

    Raises:
        ValueError: If roots list is empty
    """

Example:

executor = TreeExecutor(uuid="Tree", roots=[root])

# Get leaf nodes
leaves = executor.get_leaves()

print(f"Found {len(leaves)} leaf nodes:")
for leaf in leaves:
    print(f"  - {leaf.uuid}")

stop_tree

async def stop_tree(self) -> None:
    """
    Gracefully stop tree execution.

    Signals workers to stop processing new nodes.
    """

Example:

import asyncio

async def run_with_timeout():
    executor = TreeExecutor(roots=[root])

    # Run for maximum 10 seconds
    try:
        await asyncio.wait_for(executor.run(), timeout=10)
    except asyncio.TimeoutError:
        print("Execution timed out, stopping...")
        await executor.stop_tree()

Execution Behavior

Worker Management

The executor automatically manages a pool of async workers:

  • Workers scale dynamically based on queue size
  • No manual configuration needed
  • Efficient resource utilization

Dependency Resolution

Nodes execute according to these rules:

  1. Root nodes start immediately
  2. A node is only run once all its parents have completed (the executor enqueues a node only when all its parents have completed)
  3. Independent branches execute concurrently
  4. Execution continues until there are no children left to be enqueued

Example Execution Flow

# Tree structure:
#     A
#    / \
#   B   C
#    \ /
#     D

# Execution order:
# 1. A starts (root)
# 2. A completes
# 3. B and C start concurrently
# 4. B and C complete
# 5. D starts once both B and C have completed
# 6. D completes

Return Values

run() Return Value

results = await executor.run()

# results: List[Node]
# - All executed nodes from the tree
# - Access node.output for results
# - Access node.metadata for execution info

yielding() Yield Values

async for item in executor.yielding():
    if isinstance(item, Chunk):
        # Intermediate result from yielding node
        source_uuid = item.uuid
        value = item.output

    elif isinstance(item, Node):
        # Node completed
        result = item.output
        runtime = item.metadata.runtime

See Also

  • Node - Node API documentation
  • Chunk - Chunk data structure