Node API Reference¶
Complete API documentation for the Node class.
Class Definition¶
class Node(Generic[N]):
"""
A node in an async execution tree.
Type Parameters:
N: The expected return type of the coroutine
"""
Constructor¶
Node(
coroutine: AwaitableCallback,
*,
uuid: str,
kwargs: Optional[dict] = None,
timeout: Optional[float] = 60.0,
on_connect: Optional[Callable] = None,
on_disconnect: Optional[Callable] = None,
on_before_run: Optional[Callable] = None,
on_after_run: Optional[Callable] = None
)
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
coroutine |
AwaitableCallback |
Required | The async function to execute |
uuid |
str |
Required | Unique identifier for the node |
kwargs |
Optional[dict] |
None |
Arguments to pass to the coroutine. Values can be static or lambda functions for dynamic evaluation |
timeout |
Optional[float] |
60.0 |
Maximum execution time in seconds for the node's own run |
on_connect |
Optional[Callable] |
None |
Async callback when connecting to a child |
on_disconnect |
Optional[Callable] |
None |
Async callback when disconnecting from a child |
on_before_run |
Optional[Callable] |
None |
Async callback before execution |
on_after_run |
Optional[Callable] |
None |
Async callback after execution |
Example¶
async def my_task(value: int):
return value * 2
node = Node(
coroutine=my_task,
uuid="doubler",
kwargs=dict(value=5),
timeout=30,
)
Properties¶
output¶
Returns None before execution completes.
Example:
aggregated_output¶
@property
def aggregated_output(self) -> list:
"""List of all outputs yielded by a yielding coroutine."""
Example:
async def counting_task():
for i in range(5):
yield f"Step {i}"
node = Node(coroutine=counting_task, uuid="counter")
executor = TreeExecutor(roots=[node])
await executor.run()
# Get all yielded outputs
all_outputs = node.aggregated_output # ["Step 0", "Step 1", "Step 2", "Step 3", "Step 4"]
metadata¶
Returns a Metadata dataclass with:
- runtime: Execution time in seconds (float)
- level: Depth in tree, starting from 0 for roots (int)
Example:
await node.run()
print(f"Executed in {node.metadata.runtime}s")
print(f"Tree level: {node.metadata.level}")
parents¶
children¶
uuid¶
Methods¶
connect¶
async def connect(
self,
target: Node,
*,
forward: Optional[Union[str, Node.AUTO]] = None,
on_before_forward: Optional[
Union[
OnForwardCallable,
tuple[OnForwardCallable, Optional[dict[str, Any]]],
]
] = None
) -> None:
"""
Connect this node to a child node.
Args:
target: The child node to connect to
forward: Optional forwarding target. Use a string or `Node.AUTO`.
on_before_forward: Optional callback (or `(callback, fixed_kwargs)`) to transform the forwarded value.
Raises:
ForwardingOverrideError: If forwarding conflicts with existing kwarg
SafeExecutionError: If node is currently running
"""
Example:
await parent.connect(child) # Basic connection
await parent.connect(child, forward="data") # With forwarding
await parent.connect(child, forward=Node.AUTO) # AUTO (single-input children)
async def transform(data: str):
return value.upper()
await parent.connect(
child,
forward="data",
on_before_forward=transform
)
disconnect¶
async def disconnect(self, target: Node) -> None:
"""
Disconnect this node from a child node.
Args:
target: The child node to disconnect from
Raises:
SafeExecutionError: If node is currently running
"""
Example:
redirect¶
async def redirect(self, targets: list[Node]) -> None:
"""
Disconnect all current children and connect to new targets.
Args:
targets: List of new child nodes
Raises:
SafeExecutionError: If node is currently running
"""
Example:
# Current children: [old_a, old_b]
await parent.redirect([new_a, new_b, new_c])
# Current children: [new_a, new_b, new_c]
run¶
async def run(self) -> N:
"""
Execute the node's coroutine.
Returns:
The coroutine's return value
Raises:
asyncio.TimeoutError: If execution exceeds timeout
NotAsyncCallableError: If coroutine is an async generator
MismatchChunkType: If return type doesn't match generic parameter
"""
Example:
run_yielding¶
async def run_yielding(self) -> AsyncGenerator[Chunk[N], None]:
"""
Execute the node's async generator coroutine, yielding chunks.
Yields:
Chunk objects wrapping intermediate results
Raises:
asyncio.TimeoutError: If execution exceeds timeout
NotAsyncCallableError: If coroutine is not an async generator
MismatchChunkType: If yielded type doesn't match generic parameter
"""
Example:
async def yielding_task():
for i in range(5):
yield i
node = Node(coroutine=yielding_task, uuid="yielder")
async for chunk in node.run_yielding():
print(f"Got: {chunk.output}")
Type Parameters¶
Generic Type Validation¶
Specify expected return type:
# Node that returns string
string_node = Node[str](coroutine=return_string, uuid="str")
# Node that returns int
int_node = Node[int](coroutine=return_int, uuid="int")
# Node that returns custom type
user_node = Node[User](coroutine=get_user, uuid="user")
Without type parameter, no validation occurs:
Event Callbacks¶
on_connect¶
async def on_connect_callback(parent: Node, child: Node) -> None:
"""Called when parent connects to child."""
on_disconnect¶
async def on_disconnect_callback(parent: Node, child: Node) -> None:
"""Called when parent disconnects from child."""
on_before_run¶
on_after_run¶
async def on_after_run_callback(node: Node) -> None:
"""Called after node execution. Node.output and Node.metadata are available."""
on_before_forward¶
async def on_before_forward_callback(
value: Any,
**kwargs: Any
) -> Any:
"""
Called before forwarding value to child.
Receives the forwarded value (positional) plus any `fixed_kwargs` if provided.
Alternatively, you can declare the parameter as keyword-only `forward_data`.
"""
See Also¶
- TreeExecutor - Execute node trees
- Chunk - Wrapper for yielded values