BaseWorkflow API Reference¶
Base class for building LLM-powered workflows using grafo DAGs.
Constructor¶
Parameters:
ai_toolkit(PyAIToolkit): Instance of PyAIToolkit for LLM operationserror_class(Type[Exception]): Exception class to raise on workflow errorsecho(bool): Enable debug logging
Example:
from py_ai_toolkit import PyAIToolkit
from py_ai_toolkit.core.base import BaseWorkflow
from py_ai_toolkit.core.domain.errors import WorkflowError
ait = PyAIToolkit(main_model_config=LLMConfig())
workflow = BaseWorkflow(
ai_toolkit=ait,
error_class=WorkflowError,
echo=True
)
Methods¶
task()¶
Execute a single LLM task (text or structured).
async def task(
template: str | None = None,
response_model: Type[S] | None = None,
echo: bool = False,
**kwargs: Any
) -> Union[str, S]
Parameters:
template(str | None): Prompt template (file path or inline string)response_model(Type[S] | None): Optional Pydantic model for structured outputecho(bool): Log output**kwargs: Template variables
Returns: Text string or model instance
Example:
# Text response
text = await workflow.task(
template="Summarize: {{ article }}",
article=long_text
)
# Structured response
result = await workflow.task(
template="Extract entities: {{ text }}",
response_model=Entities,
text=document
)
create_task_tree()¶
Create an executable task tree with validation.
async def create_task_tree(
template: str,
response_model: Type[S],
kwargs: dict[str, Any],
config: ValidationConfig = SingleShotValidationConfig(),
echo: bool = False
) -> TreeExecutor[S | V]
Parameters:
template(str): Prompt templateresponse_model(Type[S]): Pydantic model for outputkwargs(dict[str, Any]): Template variablesconfig(ValidationConfig): Validation configurationecho(bool): Enable logging
Returns: TreeExecutor ready to run
Example:
from py_ai_toolkit.core.domain.interfaces import ThresholdVotingValidationConfig
executor = await workflow.create_task_tree(
template="Parse this: {{ data }}",
response_model=ParsedData,
kwargs=dict(data=raw_input),
config=ThresholdVotingValidationConfig(
issues=["Output is accurate"]
)
)
results = await executor.run()
parsed = results[0].output
build_task_node()¶
Create a standalone node containing a task tree subtree.
async def build_task_node(
uuid: str,
template: str,
response_model: Type[S],
kwargs: dict[str, Any],
config: ValidationConfig = SingleShotValidationConfig()
) -> Node[S]
Parameters:
uuid(str): Unique identifier for the nodetemplate(str): Prompt templateresponse_model(Type[S]): Output modelkwargs(dict[str, Any]): Template variablesconfig(ValidationConfig): Validation config
Returns: Node[S] that can be connected to other nodes
Example:
node = await workflow.build_task_node(
uuid="extraction",
template="Extract: {{ text }}",
response_model=ExtractedData,
kwargs=dict(text=input_text),
config=SingleShotValidationConfig(issues=["Complete extraction"])
)
# Connect to other nodes in larger workflow
await node.connect(another_node)
run()¶
Execute the workflow.
Returns: Output from the workflow execution
Raises: WorkflowError if executor not initialized
Example:
executor = await workflow.create_task_tree(...)
workflow.executor = executor
result = await workflow.run()
Internal Methods¶
These methods are available but primarily for advanced use cases:
_create_task_node()¶
Create a basic task node.
def _create_task_node(
template: str,
uuid: str | None = None,
response_model: Type[S] | None = None,
echo: bool = False,
**kwargs: Any
) -> Node[Any]
_create_issue_node()¶
Create a validation issue node.
State Management¶
The workflow maintains internal state:
current_retries(int): Current retry countexecutor(TreeExecutor | None): Active executorfailure_reasonings(dict[str, list[str]]): Collected validation failures
These are managed automatically but can be accessed for debugging or custom extensions.
Extending BaseWorkflow¶
Create custom workflows by subclassing:
class MyWorkflow(BaseWorkflow):
async def run(self, input_data: str) -> Output:
# Custom orchestration logic
node1 = self._create_task_node(...)
node2 = self._create_task_node(...)
await node1.connect(node2)
executor = TreeExecutor(uuid="my_workflow", roots=[node1])
results = await executor.run()
return results[-1].output
Integration with Grafo¶
BaseWorkflow wraps grafo functionality:
- Node: Represents a single operation (LLM call, validation, etc.)
- TreeExecutor: Orchestrates node execution respecting dependencies
- DAG structure: Nodes connected via
.connect()form the execution graph
You can use grafo's full capabilities alongside BaseWorkflow methods for maximum flexibility.