Skip to content

Node API Reference

Complete API documentation for the Node class.

Class Definition

class Node(Generic[N]):
    """
    A node in an async execution tree.

    Type Parameters:
        N: The expected return type of the coroutine
    """

Constructor

Node(
    coroutine: AwaitableCallback,
    *,
    uuid: str,
    kwargs: Optional[dict] = None,
    timeout: Optional[float] = 60.0,
    on_connect: Optional[Callable] = None,
    on_disconnect: Optional[Callable] = None,
    on_before_run: Optional[Callable] = None,
    on_after_run: Optional[Callable] = None
)

Parameters

Parameter Type Default Description
coroutine AwaitableCallback Required The async function to execute
uuid str Required Unique identifier for the node
kwargs Optional[dict] None Arguments to pass to the coroutine. Values can be static or lambda functions for dynamic evaluation
timeout Optional[float] 60.0 Maximum execution time in seconds for the node's own run
on_connect Optional[Callable] None Async callback when connecting to a child
on_disconnect Optional[Callable] None Async callback when disconnecting from a child
on_before_run Optional[Callable] None Async callback before execution
on_after_run Optional[Callable] None Async callback after execution

Example

async def my_task(value: int):
    return value * 2

node = Node(
    coroutine=my_task,
    uuid="doubler",
    kwargs=dict(value=5),
    timeout=30,
)

Properties

output

@property
def output(self) -> Optional[N]:
    """The return value of the coroutine after execution."""

Returns None before execution completes.

Example:

await node.run()
result = node.output  # Access the result

aggregated_output

@property
def aggregated_output(self) -> list:
    """List of all outputs yielded by a yielding coroutine."""

Example:

async def counting_task():
    for i in range(5):
        yield f"Step {i}"

node = Node(coroutine=counting_task, uuid="counter")
executor = TreeExecutor(roots=[node])

await executor.run()

# Get all yielded outputs
all_outputs = node.aggregated_output  # ["Step 0", "Step 1", "Step 2", "Step 3", "Step 4"]

metadata

@property
def metadata(self) -> Metadata:
    """Execution metadata (runtime, tree level)."""

Returns a Metadata dataclass with: - runtime: Execution time in seconds (float) - level: Depth in tree, starting from 0 for roots (int)

Example:

await node.run()
print(f"Executed in {node.metadata.runtime}s")
print(f"Tree level: {node.metadata.level}")

parents

@property
def parents(self) -> set[Node]:
    """Set of parent nodes."""

children

@property
def children(self) -> set[Node]:
    """Set of child nodes."""

uuid

@property
def uuid(self) -> str:
    """Unique identifier for the node."""

Methods

connect

async def connect(
    self,
    target: Node,
    *,
    forward: Optional[Union[str, Node.AUTO]] = None,
    on_before_forward: Optional[
        Union[
            OnForwardCallable,
            tuple[OnForwardCallable, Optional[dict[str, Any]]],
        ]
    ] = None
) -> None:
    """
    Connect this node to a child node.

    Args:
        target: The child node to connect to
        forward: Optional forwarding target. Use a string or `Node.AUTO`.
        on_before_forward: Optional callback (or `(callback, fixed_kwargs)`) to transform the forwarded value.

    Raises:
        ForwardingOverrideError: If forwarding conflicts with existing kwarg
        SafeExecutionError: If node is currently running
    """

Example:

await parent.connect(child)  # Basic connection

await parent.connect(child, forward="data")  # With forwarding

await parent.connect(child, forward=Node.AUTO)  # AUTO (single-input children)

async def transform(data: str):
    return value.upper()

await parent.connect(
    child,
    forward="data",
    on_before_forward=transform
)

disconnect

async def disconnect(self, target: Node) -> None:
    """
    Disconnect this node from a child node.

    Args:
        target: The child node to disconnect from

    Raises:
        SafeExecutionError: If node is currently running
    """

Example:

await parent.disconnect(child)

redirect

async def redirect(self, targets: list[Node]) -> None:
    """
    Disconnect all current children and connect to new targets.

    Args:
        targets: List of new child nodes

    Raises:
        SafeExecutionError: If node is currently running
    """

Example:

# Current children: [old_a, old_b]
await parent.redirect([new_a, new_b, new_c])
# Current children: [new_a, new_b, new_c]

run

async def run(self) -> N:
    """
    Execute the node's coroutine.

    Returns:
        The coroutine's return value

    Raises:
        asyncio.TimeoutError: If execution exceeds timeout
        NotAsyncCallableError: If coroutine is an async generator
        MismatchChunkType: If return type doesn't match generic parameter
    """

Example:

node = Node(coroutine=my_task, uuid="task")
result = await node.run()

run_yielding

async def run_yielding(self) -> AsyncGenerator[Chunk[N], None]:
    """
    Execute the node's async generator coroutine, yielding chunks.

    Yields:
        Chunk objects wrapping intermediate results

    Raises:
        asyncio.TimeoutError: If execution exceeds timeout
        NotAsyncCallableError: If coroutine is not an async generator
        MismatchChunkType: If yielded type doesn't match generic parameter
    """

Example:

async def yielding_task():
    for i in range(5):
        yield i

node = Node(coroutine=yielding_task, uuid="yielder")

async for chunk in node.run_yielding():
    print(f"Got: {chunk.output}")

Type Parameters

Generic Type Validation

Specify expected return type:

# Node that returns string
string_node = Node[str](coroutine=return_string, uuid="str")

# Node that returns int
int_node = Node[int](coroutine=return_int, uuid="int")

# Node that returns custom type
user_node = Node[User](coroutine=get_user, uuid="user")

Without type parameter, no validation occurs:

# No type validation
any_node = Node(coroutine=return_anything, uuid="any")

Event Callbacks

on_connect

async def on_connect_callback(parent: Node, child: Node) -> None:
    """Called when parent connects to child."""

on_disconnect

async def on_disconnect_callback(parent: Node, child: Node) -> None:
    """Called when parent disconnects from child."""

on_before_run

async def on_before_run_callback(node: Node) -> None:
    """Called before node execution."""

on_after_run

async def on_after_run_callback(node: Node) -> None:
    """Called after node execution. Node.output and Node.metadata are available."""

on_before_forward

async def on_before_forward_callback(
    value: Any,
    **kwargs: Any
) -> Any:
    """
    Called before forwarding value to child.
    Receives the forwarded value (positional) plus any `fixed_kwargs` if provided.
    Alternatively, you can declare the parameter as keyword-only `forward_data`.
    """

See Also