Core Concepts
This guide explains the fundamental concepts that power Fabricatio.
Event-Driven Architecture
Fabricatio uses an event-driven architecture where agents respond to events rather than direct method calls. This pattern enables loose coupling and flexible composition.
Event System
Events are the triggers that initiate workflow execution:
from fabricatio import Event
# Create an event by name
event = Event.quick_instantiate("talk")
# Collapse to string for registration
event_key = event.collapse() # "talk"
Events support:
Quick instantiation:
Event.quick_instantiate(name)String conversion:
.collapse()returns the event nameCustom payload: Events can carry data (see advanced usage)
EventEmitter Pattern
The EventEmitter provides publish-subscribe functionality:
from fabricatio import EventEmitter
emitter = EventEmitter()
def handler(data):
print(f"Received: {data}")
# Subscribe
emitter.on("message", handler)
# Publish
emitter.emit("message", "Hello!")
Roles
A Role is the primary agent entity that orchestrates skills and LLM interactions.
from fabricatio import Role, Event, WorkFlow
role = Role.with_bio(name="assistant", description="A helpful assistant") \
.subscribe(Event.quick_instantiate("help"), WorkFlow(
name="help",
steps=(HelpAction,)
))
Role Lifecycle
flowchart TD
A["1. Create Role with skills registered"]
B["2. Propose task or create directly"]
C["3. Delegate task to event name"]
D["4. Event triggers WorkFlow"]
E["5. Actions execute in sequence"]
F["6. Result returned to task"]
A --> B --> C --> D --> E --> F
Skills
A Skill maps an Event to a WorkFlow:
from fabricatio import Skill
skill = Skill(
event=Event.quick_instantiate("analyze"),
workflow=WorkFlow(name="analyze", steps=(AnalyzeAction,))
)
Skills are stored in the Role’s skill registry and retrieved by event name during delegation.
WorkFlows
A WorkFlow defines a sequence of Actions:
from fabricatio import WorkFlow, Action
class Step1(Action):
async def _execute(self, **kwargs):
# First step
return "step1_result"
class Step2(Action):
async def _execute(self, **kwargs):
# Second step
return "step2_result"
workflow = WorkFlow(
name="multi_step",
steps=(Step1, Step2) # Executes in order
)
WorkFlow Execution Flow
flowchart TD
A["WorkFlow.start()"]
B["Execute Step 1"]
C["Execute Step 2"]
D["Execute Step N"]
E["WorkFlow.complete()"]
A --> B
B -->|"_execute() returns result"| C
C -->|"_execute() returns result"| D
D -->|"_execute() returns result"| E
Tasks
Task represents a unit of work:
from fabricatio import Task
# Create a task directly
task = Task(name="analyze", briefing="Analyze the code")
# Or propose from role (LLM-generated)
task = await role.propose_task("Write a report on X")
Task Properties
name: Identifier for the taskbriefing: Instructions for executioninput: Input data for actionsoutput: Result after executionstatus: Current state (pending, running, completed, failed)
Delegation Modes
# Blocking (waits for completion)
result = await task.delegate_blocking("event_name")
# Non-blocking (returns immediately)
# Returns coroutine that can be awaited later
coro = await task.delegate("event_name")
result = await coro
# Fire-and-forget
task.delegate("event_name") # No awaiting
Actions
Action is the atomic execution unit:
from fabricatio import Action
from typing import Any
class MyAction(Action):
output_key: str = "my_result"
async def _execute(self, **kwargs) -> Any:
# Access task input
task_input = kwargs.get("task_input")
# Access previous step results
prev_results = kwargs.get("workflow_results", {})
# Return result
return "my_output"
Action Lifecycle
flowchart TD
A["Action._execute() called"]
B["Pre-execution\n(setup, validation)"]
C["Execute logic\n(_execute override point)"]
D["Post-execution\n(result storage, cleanup)"]
E["Return result"]
A --> B --> C --> D --> E
Capability Mixins
Capabilities are mixins that provide reusable functionality:
LLM Capability
UseLLM provides LLM interaction:
from fabricatio.capabilities import UseLLM
class MyAction(Action, UseLLM):
async def _execute(self, **kwargs):
# Simple text request
response = await self.aask("What is Python?")
# Structured output
structured = await self.aask_structured(
"Extract info",
response_format=MyModel
)
# With custom parameters
custom = await self.aask(
"Explain",
temperature=0.5,
max_tokens=100
)
Other Capabilities
Note
The sphinxcontrib-mermaid package that renders these diagrams is seeking new maintainers. Consider contributing if you’re interested.
%%{init: {'themeVariables': {'fontFamily': 'monospace'}}}%%
erDiagram
Capability {
string UseLLM "LLM interaction methods"
string Review "Code/content review methods"
string Extract "Structured extraction from text"
string ProposeTask "Task proposal generation"
string Improve "Content improvement"
string Rule "Rule-based processing"
string MilvusRAG "Vector store RAG"
}
Logger Integration
Fabricatio provides structured logging:
from fabricatio import logger
logger.debug("Debug message")
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")
Logger output includes:
Timestamp
Log level
Module name
Message
Logger Configuration
[debug]
log_level = "DEBUG" # or INFO, WARNING, ERROR
Async Execution
All Fabricatio operations are async-first:
import asyncio
async def main():
# All operations are async
result = await role.aask("Hello")
result = await task.delegate("event")
# Run multiple tasks concurrently
results = await asyncio.gather(
task1.delegate("event"),
task2.delegate("event"),
task3.delegate("event"),
)
asyncio.run(main())
Best Practices
Always use async/await Never mix sync and async code without proper handling.
Handle exceptions Use try/except for graceful error handling:
try: result = await task.delegate("event") except Exception as e: logger.error(f"Task failed: {e}")
Set appropriate timeouts Configure timeouts for long-running operations:
[llm] timeout = 120 # seconds
Use structured output for reliability Prefer
aask_structured()overaask()when possible.