Quick Start¶
This guide will walk you through creating your first async tree with Grafo.
Your First Tree¶
Create a simple tree with two nodes using automatic forwarding:
import asyncio
from grafo import Node, TreeExecutor
async def greet():
return "Hello"
async def add_name(greeting: str):
return f"{greeting}, World!"
async def main():
# Create nodes
node_a = Node(coroutine=greet, uuid="greet")
node_b = Node(coroutine=add_name, uuid="add_name")
# Connect with automatic forwarding
await node_a.connect(node_b, forward="greeting")
# Execute tree
executor = TreeExecutor(uuid="Greeting Tree", roots=[node_a])
await executor.run()
print(node_b.output) # "Hello, World!"
asyncio.run(main())
Key concepts:
Nodewraps an async function with a unique UUIDconnect()creates parent-child relationshipsforward="greeting"automatically passes node_a's output to node_b'sgreetingparameterTreeExecutororchestrates execution, ensuring parents run before children
Parallel Execution¶
Multiple branches execute in parallel:
async def root_task():
return "data"
async def process(data: str, label: str):
return f"{label}: {data}"
root = Node(coroutine=root_task, uuid="root")
branch_a = Node(coroutine=process, uuid="a", kwargs=dict(label="A"))
branch_b = Node(coroutine=process, uuid="b", kwargs=dict(label="B"))
await root.connect(branch_a, forward="data")
await root.connect(branch_b, forward="data")
executor = TreeExecutor(roots=[root])
await executor.run()
# Both branches run simultaneously after root completes
Streaming Results¶
Stream intermediate results from async generators:
from grafo import Chunk
async def counting_task():
for i in range(3):
yield f"Step {i}"
node = Node(coroutine=counting_task, uuid="counter")
executor = TreeExecutor(roots=[node])
async for item in executor.yielding():
if isinstance(item, Chunk):
print(f"Progress: {item.output}")
elif isinstance(item, Node):
print(f"Completed: {item.uuid}")
Manual Forwarding¶
Use lambdas for dynamic evaluation or transformations:
async def producer():
return "data"
async def consumer(value: str):
return value.upper()
node_a = Node(coroutine=producer, uuid="producer")
node_b = Node(
coroutine=consumer,
uuid="consumer",
kwargs=dict(value=lambda: node_a.output.upper()) # Transform on access
)
await node_a.connect(node_b)
Type Validation¶
Validate return types at runtime:
async def return_string():
return "text"
# Specify expected type - validates at runtime
node = Node[str](coroutine=return_string, uuid="typed")
# Raises MismatchChunkType if return type is wrong
Next Steps¶
Now that you understand the basics, explore:
- Core Concepts - Deep dive into nodes, trees, and execution
- Building Trees - Advanced tree structures
- Forwarding Data - Master state passing
- API Reference - Complete API documentation