"""Module that contains the Role class for managing workflows and their event registrations."""
from typing import Any, Callable, Dict, List, Optional, Self, Set, TypedDict, Union, Unpack, overload
from pydantic import ConfigDict, Field, PrivateAttr
from fabricatio_core.emitter import EMITTER
from fabricatio_core.journal import logger
from fabricatio_core.models.action import Action, WorkFlow
from fabricatio_core.models.generic import ScopedConfig, WithBriefing
from fabricatio_core.rust import Event
from fabricatio_core.utils import first_available
type RoleName = str
type EventPattern = str
ROLE_REGISTRY: Dict[RoleName, "Role"] = {}
[docs]
class Role(WithBriefing):
"""Class that represents a role with a registry of events and workflows.
A Role serves as a container for workflows, managing their registration to events
and providing them with shared configuration like tools and personality.
"""
model_config = ConfigDict(use_attribute_docstrings=True)
name: RoleName = Field(default="")
"""The name of the role."""
description: str = ""
"""A brief description of the role's responsibilities and capabilities."""
subscriptions: Dict[EventPattern, WorkFlow] = Field(default_factory=dict, frozen=True)
"""A dictionary of event-workflow pairs."""
_dispatched: bool = PrivateAttr(default=False)
"""A flag indicating whether the role has been dispatched."""
[docs]
@classmethod
def new(
cls,
subscriptions: Dict[EventPattern, WorkFlow],
/,
name: Optional[RoleName] = None,
description: str = "",
dispatch_on_init: bool = False,
**kwargs: Unpack[TypedDict],
) -> Self:
"""Create a new Role."""
real_name = first_available((name, cls.__name__))
self = cls(name=real_name, description=description, subscriptions=subscriptions, **kwargs)
return self.dispatch() if dispatch_on_init else self
[docs]
@classmethod
def with_bio(
cls,
name: Optional[RoleName] = None,
description: str = "",
) -> Self:
"""Create a new Role with a bio."""
return cls.new({}, name=name, description=description)
[docs]
@classmethod
def with_subscriptions(cls, subscriptions: Dict[EventPattern, WorkFlow]) -> Self:
"""Create a new Role with subscription specified only."""
return cls.new(subscriptions)
@property
def briefing(self) -> str:
"""Get the briefing of the role.
Returns:
str: The briefing of the role.
"""
base = super().briefing
abilities = "\n".join(f" - `{k}` ==> {w.briefing}" for (k, w) in self.subscriptions.items())
return f"{base}\nEvent Mapping:\n{abilities}"
@property
def accept_events(self) -> List[str]:
"""Get the set of events that the role accepts.
Returns:
Set[Event]: The set of events that the role accepts.
"""
return list(self.subscriptions.keys())
[docs]
def model_post_init(self, __context: Any) -> None:
"""Register the role."""
register_role(self)
@overload
def configure(self, /, **kwargs) -> Self: ...
@overload
def configure(self, fn: Callable[[Self], None]) -> Self: ...
[docs]
def subscribe(self, event: Event | EventPattern, workflow: WorkFlow) -> Self:
"""Register a workflow to the role's registry."""
event_string = event.collapse() if isinstance(event, Event) else event
if event_string in self.subscriptions:
logger.warn(
f"Event `{event_string}` is already registered with workflow "
f"`{self.subscriptions[event_string].name}`. It will be overwritten by `{workflow.name}`."
)
self.subscriptions[event_string] = workflow
return self
[docs]
def unsubscribe(self, event: Event | EventPattern) -> Self:
"""Unregister a workflow from the role's registry for the given event."""
event_string = event.collapse() if isinstance(event, Event) else event
if event_string in self.subscriptions:
logger.debug(f"Unregistering workflow `{self.subscriptions[event_string].name}` for event `{event_string}`")
del self.subscriptions[event_string]
else:
logger.warn(f"No workflow registered for event `{event_string}` to unregister.")
return self
[docs]
def dispatch(self, resolve_config: bool = True) -> Self:
"""Register each workflow in the registry to its corresponding event in the event bus.
Returns:
Self: The role instance for method chaining
"""
if self._dispatched:
logger.warn("Role already dispatched. Skipping dispatch.")
return self
if resolve_config:
self.resolve_configuration()
for event, workflow in self.subscriptions.items():
logger.debug(f"Registering workflow: `{workflow.name}` for event: `{event}`")
EMITTER.on(event, workflow.serve)
self._dispatched = True
return self
[docs]
def undo_dispatch(self) -> Self:
"""Unregister each workflow in the registry from its corresponding event in the event bus.
Returns:
Self: The role instance for method chaining
"""
if not self._dispatched:
logger.warn("Not dispatched, nothing to undo.")
return self
for event, workflow in self.subscriptions.items():
logger.debug(f"Unregistering workflow: `{workflow.name}` for event: `{event}`")
EMITTER.off(event)
self._dispatched = False
return self
[docs]
def resolve_configuration(self) -> Self:
"""Resolve and bind shared configuration to workflows and their components.
This method ensures that any shared configuration from the role or workflows
is properly propagated to the workflow steps and nested components. If the role
is a ScopedConfig, it holds configuration for all workflows. Similarly, if a
workflow itself is a ScopedConfig, it holds configuration for its own steps.
Returns:
Self: The role instance with resolved configurations.
"""
if issubclass(self.__class__, ScopedConfig):
logger.debug(f"Role `{self.name}` is a ScopedConfig. Applying configuration to all workflows.")
self.hold_to(self.subscriptions.values(), EXCLUDED_FIELDS) # pyright: ignore [reportAttributeAccessIssue]
for workflow in self.subscriptions.values():
if issubclass(workflow.__class__, ScopedConfig):
logger.debug(f"Workflow `{workflow.name}` is a ScopedConfig. Applying configuration to its steps.")
workflow.hold_to(workflow.steps, EXCLUDED_FIELDS) # pyright: ignore [reportAttributeAccessIssue]
elif issubclass(self.__class__, ScopedConfig):
logger.debug(
f"Workflow `{workflow.name}` is not a ScopedConfig, but role `{self.name}` is. "
"Applying role configuration to workflow steps."
)
self.hold_to(workflow.steps, EXCLUDED_FIELDS) # pyright: ignore [reportAttributeAccessIssue]
else:
logger.debug(
f"Neither role nor workflow `{workflow.name}` is a ScopedConfig. "
"Skipping configuration resolution for this workflow."
)
continue
return self
[docs]
def register_role(role: "Role", override: bool = True) -> None:
"""Register the role into the global registry."""
if not override and role.name in ROLE_REGISTRY:
raise ValueError(f"Role with name `{role.name}` already exists.")
logger.debug(f"Registering role: `{role.name}`")
ROLE_REGISTRY[role.name] = role
[docs]
def unregister_role(role: Union["Role", RoleName]) -> None:
"""Unregister the role from the global registry."""
name = role.name if isinstance(role, Role) else role
if name not in ROLE_REGISTRY:
raise ValueError(f"Role with name `{name}` does not exist.")
del ROLE_REGISTRY[name]
[docs]
def clear_registry() -> None:
"""Clear the global registry of all registered roles."""
ROLE_REGISTRY.clear()
@overload
def get_registered_role(role_name: RoleName) -> Role: ...
@overload
def get_registered_role(role_name: Set[RoleName]) -> List[Role]: ...
[docs]
def get_registered_role(role_name: RoleName | Set[RoleName]) -> Role | List[Role]:
"""Get a registered role by name."""
return ROLE_REGISTRY[role_name] if isinstance(role_name, str) else [ROLE_REGISTRY[r] for r in role_name]
EXCLUDED_FIELDS = set(
list(Role.model_fields.keys()) + list(WorkFlow.model_fields.keys()) + list(Action.model_fields.keys())
)
"""The set of fields that should not be resolved during configuration resolution."""