Skip to content

Workflows

Workflows enable complex, multi-step LLM orchestration using the BaseWorkflow class built on grafo.

What are Workflows?

Workflows represent task execution as directed acyclic graphs (DAGs), where:

  • Nodes contain async coroutines (LLM calls, validations, data processing)
  • Edges define execution dependencies
  • Executors manage parallel and sequential execution

This abstraction fits naturally with AI-driven tasks, where outputs from one LLM call often feed into another.

BaseWorkflow

The BaseWorkflow class provides conveniences for building LLM-powered workflows:

from py_ai_toolkit import PyAIToolkit
from py_ai_toolkit.core.base import BaseWorkflow
from py_ai_toolkit.core.domain.errors import WorkflowError

ait = PyAIToolkit(main_model_config=LLMConfig())

workflow = BaseWorkflow(
    ai_toolkit=ait,
    error_class=WorkflowError,
    echo=False  # Set True for debug logging
)

Simple Task Execution

Use the task() method for individual LLM calls within workflows:

# Text response
result = await workflow.task(
    template="Summarize: {{ text }}",
    text=long_article
)

# Structured response
result = await workflow.task(
    template="Extract entities from: {{ text }}",
    response_model=Entities,
    text=document
)

Building Task Trees

Create complex workflows with validation using create_task_tree():

from py_ai_toolkit.core.domain.interfaces import SingleShotValidationConfig

executor = await workflow.create_task_tree(
    template="Extract purchase intent: {{ message }}",
    response_model=Purchase,
    kwargs=dict(message="I want 5 apples"),
    config=SingleShotValidationConfig(
        issues=["The purchase matches the user's request"]
    )
)

results = await executor.run()
purchase = results[0].output

This creates a task node with automatic validation and retry logic.

Manual Node Creation

For fine-grained control, create nodes manually:

from grafo import Node, TreeExecutor

# Create task node
task_node = Node[Purchase](
    uuid="purchase_extraction",
    coroutine=workflow.task,
    kwargs=dict(
        template="Extract purchase: {{ message }}",
        response_model=Purchase,
        message="I want 3 bananas"
    )
)

# Create dependent node
summary_node = Node[str](
    uuid="purchase_summary",
    coroutine=workflow.task,
    kwargs=dict(
        template="Summarize this purchase: {{ purchase }}",
        purchase=task_node.output  # Uses output from task_node
    )
)

# Connect nodes
await task_node.connect(summary_node)

# Execute workflow
executor = TreeExecutor(uuid="purchase_workflow", roots=[task_node])
results = await executor.run()

Parallel Execution

Execute independent tasks concurrently:

# Create multiple independent nodes
node1 = Node(uuid="task1", coroutine=workflow.task, kwargs={...})
node2 = Node(uuid="task2", coroutine=workflow.task, kwargs={...})
node3 = Node(uuid="task3", coroutine=workflow.task, kwargs={...})

# All nodes run in parallel
executor = TreeExecutor(uuid="parallel_tasks", roots=[node1, node2, node3])
results = await executor.run()

Sequential with Branching

Combine sequential and parallel patterns:

# Initial task
extract_node = Node(uuid="extract", coroutine=workflow.task, kwargs={...})

# Parallel analysis tasks
sentiment_node = Node(uuid="sentiment", coroutine=workflow.task, kwargs={...})
entities_node = Node(uuid="entities", coroutine=workflow.task, kwargs={...})

# Final synthesis
synthesis_node = Node(uuid="synthesis", coroutine=workflow.task, kwargs={...})

# Build graph
await extract_node.connect(sentiment_node)
await extract_node.connect(entities_node)
await sentiment_node.connect(synthesis_node)
await entities_node.connect(synthesis_node)

executor = TreeExecutor(uuid="analysis_workflow", roots=[extract_node])
await executor.run()

Validation Integration

Workflows automatically integrate validation (see Running Tasks):

executor = await workflow.create_task_tree(
    template="...",
    response_model=MyModel,
    kwargs={...},
    config=ThresholdVotingValidationConfig(
        issues=["Output is accurate", "Format is correct"],
        max_retries=3
    )
)

The workflow handles:

  • Running validation nodes after task nodes
  • Collecting failure reasons
  • Retrying with feedback
  • Redirecting flow on validation failure

Custom Workflows

Extend BaseWorkflow for domain-specific needs:

class DataProcessingWorkflow(BaseWorkflow):
    async def run(self, data: str) -> ProcessedData:
        # Custom orchestration logic
        extract_node = self._create_task_node(
            template="Extract data: {{ input }}",
            response_model=RawData,
            input=data
        )

        # ... build workflow graph

        executor = TreeExecutor(uuid="data_processor", roots=[extract_node])
        results = await executor.run()
        return results[0].output

Learn More