Workflow Patterns
This guide documents common workflow patterns and best practices for building Fabricatio applications.
Event Flow Overview
Fabricatio’s event-driven architecture follows a consistent flow pattern:
flowchart TD
TC["Task Created"] --> TD2["Task.delegate"]
TD2 --> EE["Event Emitter"]
EE --> WR["WorkFlow Registry\n(Role.skills)"]
WR --> ME["Match Event"]
ME --> WS["WorkFlow.serve\n(executes)"]
WS --> A1["Action 1\n._execute()"]
WS --> A2["Action 2\n._execute()"]
WS --> AN["Action N\n._execute()"]
A1 --> TO["Task.output\n(result)"]
A2 --> TO
AN --> TO
Simple Action Workflow
The most basic pattern - a single action responding to an event.
flowchart TD
EV["Event: greet"] --> WF["WorkFlow('greet')"]
WF --> HA["HelloAction\n._execute() \u2192 'Hello, World!'"]
HA --> R["Result"]
from fabricatio import Action, Event, Role, Task, WorkFlow
class HelloAction(Action):
async def _execute(self, **kwargs) -> str:
return "Hello, World!"
role = Role.with_bio(name="greeter") \
.subscribe(Event.quick_instantiate("greet"), WorkFlow(
name="greet",
steps=(HelloAction,)
))
task = Task(name="say_hello", briefing="Greet the user")
result = await task.delegate_blocking("greet")
Multi-Step Pipeline
Chain multiple actions where each step’s output feeds into the next.
flowchart TD
WR["workflow_results = {}"] --> FA["Fetch Action\n._execute()\n\u2192 'Fetched: X'"]
FA --> PA["Process Action\n._execute()\n\u2192 'Processed: Fetched: X'"]
PA --> FOA["Format Action\n._execute()\n\u2192 'Formatted: Processed: Fetched: X'"]
FOA --> R["Result"]
class Fetch(Action):
async def _execute(self, task_input: Task[str], **_) -> str:
return f"Fetched: {task_input.briefing}"
class Process(Action):
async def _execute(self, task_input: Task[str], **kwargs) -> str:
# Access previous step result
prev = kwargs.get("workflow_results", {})
fetched = prev.get("Fetch", "")
return f"Processed: {fetched}"
class Format(Action):
async def _execute(self, task_input: Task[str], **kwargs) -> str:
prev = kwargs.get("workflow_results", {})
processed = prev.get("Process", "")
return f"Formatted: {processed}"
role = Role.with_bio(name="pipeline") \
.subscribe(Event.quick_instantiate("run"), WorkFlow(
name="run",
steps=(Fetch, Process, Format)
))
Parallel Actions
Execute multiple independent actions concurrently.
flowchart TD
WR["workflow_results = {}"] --> FU["FetchUser Action\n(async)"]
WR --> FH["FetchHistory Action\n(async)"]
WR --> OT["Other Action\n(async)"]
FU --> R1["workflow_results\n'FetchUser'"]
FH --> R2["workflow_results\n'FetchHistory'"]
OT --> R3["workflow_results\n'Other'"]
R1 --> AG["Aggregate Action\nCombines all"]
R2 --> AG
R3 --> AG
class FetchUser(Action):
async def _execute(self, task_input: Task, **_) -> dict:
# Fetch user data
return {"user": await self.aask("Get user info")}
class FetchHistory(Action):
async def _execute(self, task_input: Task, **_) -> dict:
# Fetch history
return {"history": await self.aask("Get history")}
class Aggregate(Action):
async def _execute(self, task_input: Task, **kwargs) -> dict:
prev = kwargs.get("workflow_results", {})
# Aggregate independent results
return {**prev.get("FetchUser", {}), **prev.get("FetchHistory", {})}
# Note: Parallel execution within a workflow requires asyncio.gather
# or a custom executor - see advanced patterns
import asyncio
async def execute_parallel(actions, task):
# Manual parallel execution
results = await asyncio.gather(*[
action()._execute(task_input=task) for action in actions
])
return results
Conditional Branching
Use different workflows based on task characteristics.
flowchart TD
T["Task"] --> LR["LLM Router\n(aask_struct)"]
LR -->|"simple"| SA["SimpleAction\n._execute()"]
LR -->|"complex"| CA["ComplexAction\n._execute()"]
SA --> R["Result"]
CA --> R
class SimpleAction(Action):
async def _execute(self, task_input: Task[str], **_) -> str:
return f"Simple: {task_input.briefing}"
class ComplexAction(Action):
async def _execute(self, task_input: Task[str], **_) -> str:
return f"Complex: {await self.aask(task_input.briefing)}"
async def route_task(task: Task) -> str:
# LLM-based routing decision
decision = await role.aask(
f"Should we use simple or complex processing for: {task.briefing}?",
)
if "complex" in decision.lower():
return "complex"
return "simple"
# Usage
route = await route_task(task)
result = await task.delegate_blocking(route)
Task Proposal Pattern
Let the LLM decompose a goal into multiple tasks.
flowchart TD
PR["Planner Role\npropose_task('Build REST API')\n\u2192 Task1, Task2, ..."] --> PT["Proposed Task List\nTask(endpoint 1)\nTask(endpoint 2)\nTask(tests)"]
PT --> E1["Executor\nTask 1"]
PT --> E2["Executor\nTask 2"]
PT --> EN["Executor\nTask N"]
E1 --> AR["All Results"]
E2 --> AR
EN --> AR
from fabricatio.capabilities import ProposeTask
class Planner(Role, ProposeTask):
pass
class Executor(Role):
async def execute_step(self, step_briefing: str):
task = Task(name="step", briefing=step_briefing)
return await task.delegate_blocking("execute")
planner = Planner(name="planner")
executor = Executor(name="executor")
# Get proposed tasks from LLM
proposed_tasks = await planner.propose_task(
"Build a REST API for a todo app",
mode="agentic", # Detailed sub-tasks
)
# Execute each proposed task
for task in proposed_tasks:
result = await executor.execute_step(task.briefing)
Error Handling Pattern
Graceful error handling with fallback actions.
flowchart TD
PA["PrimaryAction\n._execute()"] -->|Success| RR["Return Result\nto WorkFlow"]
PA -->|Exception| LW["Log Warning\nlogger.warning\nRe-raise"]
LW --> FA["FallbackAction\n._execute()"]
FA --> FR["Return Fallback\nResult"]
class PrimaryAction(Action):
async def _execute(self, task_input: Task, **_) -> str:
try:
return await self.aask(task_input.briefing)
except Exception as e:
logger.warning(f"Primary failed: {e}")
raise # Re-raise to trigger fallback
class FallbackAction(Action):
async def _execute(self, task_input: Task, **_) -> str:
return "Default response due to failure"
class RecoveryAction(Action):
async def _execute(self, task_input: Task, **_) -> str:
# Attempt recovery or use cached data
return "Recovered response"
role = Role.with_bio(name="resilient") \
.subscribe(Event.quick_instantiate("process"), WorkFlow(
name="process",
steps=(PrimaryAction,),
fallback=FallbackAction,
))
RAG Workflow Pattern
Retrieval-augmented generation with document ingestion and query.
flowchart TD
subgraph Init["INIT KNOWLEDGE BASE"]
IK["InitKBAction\n1. init_client()\n2. view()\n3. consume_string()"] --> MV["Milvus\nVector DB"]
end
subgraph Query["QUERY KNOWLEDGE BASE"]
QK["QueryKBAction\n1. query (similarity search)\n2. aask_retrieved (RAG)"] --> LLM["LLM Response\n(grounded)"]
end
from fabricatio import Action, Event, Role, Task, WorkFlow
from fabricatio_milvus.capabilities.milvus import MilvusRAG
class InitKnowledgeBase(Action, MilvusRAG):
async def _execute(self, documents: list[str], **_) -> None:
self.init_client()
self.view("knowledge", create=True)
await self.consume_string(documents)
class QueryKnowledge(Action, MilvusRAG):
async def _execute(self, task_input: Task[str], **_) -> str:
return await self.aask_retrieved(
task_input.briefing,
task_input.briefing,
extra_system_message="Answer based on the knowledge base.",
)
role = Role.with_bio(name="knowledge_assistant") \
.subscribe(Event.quick_instantiate("init_kb"), WorkFlow(
name="init_kb",
steps=(InitKnowledgeBase,)
)) \
.subscribe(Event.quick_instantiate("query"), WorkFlow(
name="query",
steps=(QueryKnowledge,)
))
# Initialize knowledge base
init_task = Task(name="init", briefing=documents)
await init_task.delegate_blocking("init_kb")
# Query
query_task = Task(name="query", briefing="What is X?")
result = await query_task.delegate_blocking("query")
Structured Output Pattern
Use Pydantic models for reliable structured responses.
flowchart TD
IN["Prompt: Analyze this code"] --> LLM["LLM Response\nlanguage, complexity,\nissues, suggestions"]
LLM --> PV["Pydantic Model Validation\nCodeAnalysis instance"]
PV --> TR["Typed Result\n.language, .issues,\n.suggestions"]
from pydantic import BaseModel, Field
from typing import Optional
class CodeAnalysis(BaseModel):
language: str = Field(description="Programming language detected")
complexity: str = Field(description="Complexity rating: low/medium/high")
issues: list[str] = Field(description="List of code issues found")
suggestions: list[str] = Field(description="Improvement suggestions")
class Analyzer(Role):
async def analyze_code(self, code: str) -> CodeAnalysis:
return await self.aask_structured(
f"Analyze this code:\n{code}",
response_format=CodeAnalysis,
)
analyzer = Analyzer(name="analyzer")
result = await analyzer.analyze_code("def foo(): pass")
# result is a fully typed CodeAnalysis instance
print(f"Language: {result.language}")
print(f"Issues: {result.issues}")
Review-Improvement Loop
Iterative refinement through review cycles.
flowchart TD
IC["Initial Content\nimprove_string(briefing)"] --> REV{"Review\nscore >= 8?"}
REV -->|No| IMP["Improve with feedback"]
IMP --> REV
REV -->|Yes| FC["Final Content"]
from fabricatio.capabilities import Review, Improve
class Writer(Role, Improve):
pass
class Reviewer(Role, Review):
pass
writer = Writer(name="writer")
reviewer = Reviewer(name="reviewer")
async def write_with_review(briefing: str, max_iterations: int = 3) -> str:
content = await writer.improve_string(briefing, style="clear")
for i in range(max_iterations):
review_result = await reviewer.review_string(
content,
criteria="quality and clarity"
)
if review_result.score >= 8:
break
# Incorporate feedback
feedback = "\n".join(review_result.suggestions)
content = await writer.improve_string(
f"Original:\n{content}\n\nFeedback:\n{feedback}",
style="clear"
)
return content
Checkpoint Pattern
Save workflow state for recovery.
flowchart TD
WS["Workflow Start"] --> LC{"Load Checkpoint\n(if exists)"}
LC -->|"has state\nstep_2_complete"| SKIP["Skip to Step 2\nReturn cached result"]
LC -->|"no state"| S1["Execute Step 1\nSave checkpoint\nstep_1_complete"]
S1 --> S2["Execute Step 2\nSave checkpoint\nstep_2_complete\nSave result"]
S2 --> RET["Return result"]
from fabricatio.checkpoint import Checkpoint
class LongRunningAction(Action):
async def _execute(self, task_input: Task, **kwargs) -> str:
checkpoint = Checkpoint.load("workflow_id")
if checkpoint.has_state("step_2_complete"):
# Resume from step 2
return checkpoint.get("result")
# Do step 1
result1 = await self.do_step1()
checkpoint.save("step_1_complete", True)
# Do step 2
result2 = await self.do_step2(result1)
checkpoint.save("step_2_complete", True)
checkpoint.save("result", result2)
return result2
Team Collaboration Pattern
Multiple agents working together.
flowchart TD
subgraph Team
R["Researcher"]
W["Writer"]
E["Editor"]
RV["Reviewer"]
end
R --> RP["Research Phase"]
RP --> WP["Write Phase"]
W --> WP
WP --> EP["Edit Phase"]
E --> EP
from fabricatio_team import Team
class Researcher(Role):
pass
class Writer(Role):
pass
class Editor(Role):
pass
async def collaborative_writing(topic: str) -> str:
team = Team(name="writing_team")
researcher = Researcher(name="researcher")
writer = Writer(name="writer")
editor = Editor(name="editor")
# Add agents to team
team.add_agent(researcher, role="research")
team.add_agent(writer, role="writing")
team.add_agent(editor, role="editing")
# Research phase
research = await team.delegate("research", f"Research: {topic}")
# Writing phase (depends on research)
draft = await team.delegate("writing", f"Write article about {topic}")
# Editing phase
final = await team.delegate("editing", f"Edit: {draft}")
return final
EventEmitter Wildcard Pattern
Using wildcards for flexible event routing.
flowchart TD
subgraph EE["EventEmitter (sep='::')"]
H1["user::* \u2192 user_activity_handler"]
H2["user::login \u2192 login_handler"]
H3["user::logout \u2192 logout_handler"]
H4["system::* \u2192 system_handler"]
end
subgraph E1["Emit: user::login"]
direction LR
LH["login_handler"]
UAH["user_activity_handler"]
end
subgraph E2["Emit: system::alert"]
SH["system_handler"]
end
from fabricatio import EventEmitter
# Create an event emitter with custom separator
emitter = EventEmitter(sep="::")
# Register handlers for exact events
def login_handler(data):
print(f"User logged in: {data}")
def logout_handler(data):
print(f"User logged out: {data}")
def user_activity_handler(data):
print(f"User activity: {data}")
def system_handler(data):
print(f"System event: {data}")
# Exact event matching
emitter.on("user::login", login_handler)
emitter.on("user::logout", logout_handler)
# Wildcard matching - catches all user::* events
emitter.on("user::*", user_activity_handler)
# Wildcard matching for system events
emitter.on("system::*", system_handler)
# Emit events
emitter.emit("user::login", {"user_id": 123})
# Both login_handler and user_activity_handler are called
emitter.emit("user::update", {"user_id": 123})
# Only user_activity_handler is called
emitter.emit("system::restart", {"reason": "maintenance"})
# system_handler is called
Task Lifecycle Pattern
Task states and transitions.
flowchart TD
TC["Task Created\n(pending)"] -->|"task.start()"| TR["Task Running\n(active)"]
TR -->|"task.fail(e)"| TF["Task Failed\n(error)"]
TR -->|"task.finish()"| TFI["Task Finished\n(success)"]
TR -->|"task.cancel()"| TCA["Task Cancelled\n(aborted)"]
subgraph Events["Event Emission"]
direction LR
E1["publish('work') \u2192 Pending"]
E2["start() \u2192 Running"]
E3["finish(r) \u2192 Finished"]
E4["fail(e) \u2192 Failed"]
E5["cancel() \u2192 Cancelled"]
end
from fabricatio import Task
# Task state transitions emit events
task = Task(name="process_data")
# Emits "work::process_data::Pending"
task.publish("work")
# Emits "work::process_data::Running"
await task.start()
# Emits "work::process_data::Finished"
await task.finish(result)
# Or handle failure
await task.fail(error)
Role and Skill Registration Pattern
How roles register and handle skills.
flowchart TD
subgraph Role["Role (name='assistant')"]
direction LR
S1["greet \u2192 WorkFlow(Hello)"]
S2["analyze \u2192 WorkFlow(Parse, Analyze, Report)"]
S3["help \u2192 WorkFlow(Help)"]
end
Role -->|"role.dispatch()"| ER["EventEmitter\nRegistration"]
ER --> R1["greet registered"]
ER --> R2["analyze registered"]
ER --> R3["help registered"]
from fabricatio import Role, Event, WorkFlow
role = Role.with_bio() \
.subscribe(Event.quick_instantiate("greet"), WorkFlow(
name="greet",
steps=(HelloAction,)
)) \
.subscribe(Event.quick_instantiate("analyze"), WorkFlow(
name="analyze",
steps=(ParseAction, AnalyzeAction, ReportAction)
)) \
.subscribe(Event.quick_instantiate("help"), WorkFlow(
name="help",
steps=(HelpAction,)
))
# Register all skills with the event emitter
role.dispatch()
# Now tasks can delegate to registered events
task = Task(name="demo")
result = await task.delegate_blocking("greet")
Capability Mixin Pattern
Combining capabilities with actions.
flowchart TD
A["Action\n(base class)"] --> ULLM["UseLLM"]
A --> RV["Review"]
A --> EX["Extract"]
A --> PT["ProposeTask"]
A --> IMP["Improve"]
ULLM --> CA["Combined Action\nclass MyAction(\n Action, UseLLM, Review\n)"]
RV --> CA
from fabricatio import Action, Task
from fabricatio.capabilities import UseLLM, Review
class AnalyzeAndReview(Action, UseLLM, Review):
"""Action that analyzes content and can review it."""
output_key: str = "analysis"
async def _execute(self, task_input: Task[str], **_) -> str:
# Use LLM capability to analyze
analysis = await self.aask(f"Analyze: {task_input.briefing}")
# Use Review capability to score
review_result = await self.review_string(
analysis,
criteria="accuracy and clarity"
)
return f"{analysis}\n\nReview score: {review_result.score}"
Role Inheritance Pattern
Creating specialized roles through inheritance.
flowchart TD
R1["Role\n(base class)"] --> SR["SpecializedRole\nCustom name, skills,\ncapabilities"]
subgraph Chain["Example Inheritance Chain"]
R2["Role"] --> ULLM2["UseLLM (mixin)"]
ULLM2 --> CR1["Role (combined)"]
CR1 --> PT2["ProposeTask (mixin)"]
PT2 --> CR2["Role (combined)"]
CR2 --> PL["Planner Role (concrete)\ncan propose_task, can aask"]
end
from fabricatio import Role, Event, WorkFlow
from fabricatio.capabilities import UseLLM, ProposeTask
# Simple inherited role with LLM capability
class LLMAssistant(Role, UseLLM):
def __init__(self):
super().__init__(
name="llm_assistant",
description="Assistant with LLM capabilities"
)
# Complex inherited role with multiple capabilities
class Planner(Role, UseLLM, ProposeTask):
def __init__(self):
super().__init__(
name="planner",
description="Task planning assistant"
)
# Add specialized planning workflow
self.subscribe(
Event.quick_instantiate("plan"),
WorkFlow(name="plan", steps=(PlanningAction,))
)
# Usage
planner = Planner()
tasks = await planner.propose_task("Build a web app")
Best Practices Summary
Keep Actions Single-Purpose Small, focused actions are easier to test and reuse.
Use Structured Output
aask_structured()provides reliable, typed responses.Handle Errors Explicitly Use try/except and fallback workflows.
Log Appropriately Use
loggerfor debugging and monitoring.Profile in Development Use
viztracerto identify bottlenecks.Configure Timeouts Set reasonable timeouts for LLM calls.
Test Workflows Independently Unit test actions, integration test workflows.
Use EventEmitter Wildcards Wisely Wildcard patterns like
user::*enable flexible routing but avoid overly broad matches that could cause unexpected behavior.Leverage Capability Mixins Combine capabilities to create powerful actions without code duplication.
Design for Concurrency Use
asyncio.gather()for parallel action execution when actions are independent.