Source code for fabricatio_core.models.task

"""This module defines the `Task` class, which represents a task with a status and output.

It includes methods to manage the task's lifecycle, such as starting, finishing, cancelling, and failing the task.
"""

from asyncio import Queue, run
from functools import cached_property
from typing import Any, Dict, List, Optional, Self, Union

from pydantic import Field, PrivateAttr

from fabricatio_core.emitter import EMITTER
from fabricatio_core.journal import logger
from fabricatio_core.models.generic import ProposedAble, WithBriefing, WithDependency
from fabricatio_core.rust import CONFIG, TEMPLATE_MANAGER, Event, TaskStatus

type NameSpace = Union[str, List[str]]


[docs] class Task[T](WithBriefing, ProposedAble, WithDependency): """A class representing a task with status management and output handling.""" goals: List[str] = Field(default_factory=list) """Objectives the task aims to achieve.""" dependencies: List[str] = Field(default_factory=list) """File paths necessarily needed to read or write to complete the task. Do add path(s) needed!""" description: str = Field(default="") """Detailed explanation of the task with 5W1H rule.""" name: str = Field(...) """Concise and descriptive name of the task.""" send_to: List[str] = Field(default_factory=list) """List of namespace path components used to construct the target task queue. The full queue path is formed as: `<component1>::<component2>::...::*::Pending`. For example: - with ['work'] will be received by 'work::*::Pending' - with ['write', 'book'] will be received by 'write::book::*::Pending' ⚠️ The caller must ensure that the resulting namespace (e.g., 'write::book') exists. Sending to a non-existent namespace may result in task loss or an error. """ _output: Queue[T | None] = PrivateAttr(default_factory=Queue) """The output queue of the task.""" _status: TaskStatus = PrivateAttr(default=TaskStatus.Pending) """The status of the task.""" _extra_init_context: Dict = PrivateAttr(default_factory=dict) """Extra initialization context for the task, which is designed to override the one of the Workflow.""" @property def extra_init_context(self) -> Dict: """Extra initialization context for the task, which is designed to override the one of the Workflow.""" return self._extra_init_context
[docs] def update_init_context(self, /, **kwargs) -> Self: """Update the extra initialization context for the task.""" self.extra_init_context.update(kwargs) return self
@property def assembled_prompt(self) -> str: """Assemble both dependencies and briefing prompt for the task.""" return f"{self.dependencies_prompt}\n\n{self.briefing}"
[docs] def move_to(self, new_namespace: NameSpace) -> Self: """Move the task to a new namespace. Args: new_namespace (str|List[str]): The new namespace to move the task to. Returns: Task: The moved instance of the `Task` class Example: .. code-block:: python task = Task(name="example_task", namespace=["example"]).move_to("work") assert task.namespace == ["work"] """ logger.debug(f"Moving task `{self.name}` to `{new_namespace}`") self.send_to = new_namespace if isinstance(new_namespace, list) else [new_namespace] return self
[docs] def append_extra_description(self, description: str) -> Self: r"""Append a description to the task. Args: description (str): The description to append. Returns: Task: The updated instance of the `Task` class. Example: .. code-block:: python task = Task(name="example_task", description="This is an example task.") task.append_extra_description("This task is important.") assert task.description == "This is an example task.\nThis task is important." """ self.description += f"\n{description}" return self
[docs] def update_task(self, *, goal: Optional[List[str] | str] = None, description: Optional[str] = None) -> Self: """Update the goal and description of the task. Args: goal (str|List[str], optional): The new goal of the task. description (str, optional): The new description of the task. Returns: Task: The updated instance of the `Task` class. Example: .. code-block:: python # Update both goal and description task = Task(name="example_task", goals=["old_goal"], description="old description") task.update_task(goal="new_goal", description="new description") assert task.goals == ["new_goal"] assert task.description == "new description" # Update only the goal with a single string task = Task(name="example_task", goals=["old_goal"]) task.update_task(goal="new_goal") assert task.goals == ["new_goal"] # Update goal with a list of strings task = Task(name="example_task", goals=["old_goal"]) task.update_task(goal=["new_goal1", "new_goal2"]) assert task.goals == ["new_goal1", "new_goal2"] # Update only the description task = Task(name="example_task", description="old description") task.update_task(description="new description") assert task.description == "new description" """ if goal: self.goals = goal if isinstance(goal, list) else [goal] if description: self.description = description return self
[docs] async def get_output(self) -> T | None: """Get the output of the task. Returns: T: The output of the task. Example: .. code-block:: python # Test basic output retrieval task = Task(name="output_task") await task.finish("success") assert await task.get_output() == "success" # Test output retrieval with multiple get calls task2 = Task(name="multi_get_task") await task2.finish(42) assert await task2.get_output() == 42 # Second get should return same value assert await task2.get_output() == 42 # Test output retrieval for cancelled task task3 = Task(name="cancelled_task") await task3.cancel() assert await task3.get_output() is None """ logger.debug(f"Getting output for task {self.name}") return await self._output.get()
[docs] def status_label(self, status: TaskStatus) -> str: """Return a formatted status label for the task. Args: status (fabricatio.constants.TaskStatus): The status of the task. Returns: str: The formatted status label. """ return Event.instantiate_from(self.send_to).push(self.name).push(status).collapse()
[docs] @cached_property def pending_label(self) -> str: """Return the pending status label for the task. Returns: str: The pending status label. """ return self.status_label(TaskStatus.Pending)
[docs] @cached_property def running_label(self) -> str: """Return the running status label for the task. Returns: str: The running status label. """ return self.status_label(TaskStatus.Running)
[docs] @cached_property def finished_label(self) -> str: """Return the finished status label for the task. Returns: str: The finished status label. """ return self.status_label(TaskStatus.Finished)
[docs] @cached_property def failed_label(self) -> str: """Return the failed status label for the task. Returns: str: The failed status label. """ return self.status_label(TaskStatus.Failed)
[docs] @cached_property def cancelled_label(self) -> str: """Return the cancelled status label for the task. Returns: str: The cancelled status label. """ return self.status_label(TaskStatus.Cancelled)
[docs] async def finish(self, output: T) -> Self: """Mark the task as finished and set the output. Args: output (T): The output of the task. Returns: Task: The finished instance of the `Task` class. """ logger.info(f"Finishing task {self.name}") self._status = TaskStatus.Finished logger.debug(f"Emitting finished event for task {self.name}") await EMITTER.emit(self.finished_label, self) logger.debug(f"Setting output for task {self.name}") await self._output.put(output) return self
[docs] async def start(self) -> Self: """Mark the task as running. Returns: Task: The running instance of the `Task` class. """ logger.info(f"Starting task `{self.name}`") self._status = TaskStatus.Running await EMITTER.emit(self.running_label, self) return self
[docs] async def cancel(self) -> Self: """Mark the task as cancelled. Returns: Task: The cancelled instance of the `Task` class. """ logger.info(f"Cancelling task `{self.name}`") self._status = TaskStatus.Cancelled await self._output.put(None) await EMITTER.emit(self.cancelled_label, self) return self
[docs] async def fail(self) -> Self: """Mark the task as failed. Returns: Task: The failed instance of the `Task` class. """ logger.info(f"Failing task `{self.name}`") self._status = TaskStatus.Failed await self._output.put(None) await EMITTER.emit(self.failed_label, self) return self
[docs] def publish(self, new_namespace: Optional[NameSpace] = None, *, event: Optional[NameSpace] = None) -> Self: """Publish the task to the event bus. Args: new_namespace(EventLike, optional): The new namespace to move the task to. event(EventLike, optional): The event to publish. Returns: Task: The published instance of the `Task` class. """ if event is not None: logger.debug(f"Publishing task `{self.name}` to `{event}`.") EMITTER.emit_future(Event.instantiate_from(event).collapse(), self) return self if new_namespace is not None: self.move_to(new_namespace) logger.info(f"Publishing task `{self.name}` to `{(label := self.pending_label)}`.") EMITTER.emit_future(label, self) return self
[docs] async def delegate( self, new_namespace: Optional[NameSpace] = None, *, event: Optional[NameSpace] = None ) -> T | None: """Delegate the task to the event. Args: new_namespace (EventLike, optional): The new namespace to move the task to. event (EventLike, optional): The event to publish, overrides the event in this instance and the `new_namespace`. Returns: T|None: The output of the task. """ if event is not None: logger.debug(f"Publishing task `{self.name}` to `{event}`.") EMITTER.emit_future(Event.instantiate_from(event).collapse(), self) return await self.get_output() if new_namespace is not None: self.move_to(new_namespace) logger.info(f"Delegating task `{(label := self.pending_label)}`") EMITTER.emit_future(label, self) return await self.get_output()
[docs] def delegate_blocking( self, new_namespace: Optional[NameSpace] = None, *, event: Optional[NameSpace] = None ) -> T | None: """Delegate the task to the event in a blocking manner. Args: new_namespace (EventLike, optional): The new namespace to move the task to. event (EventLike, optional): The event to publish. Returns: T|None: The output of the task. """ return run(self.delegate(new_namespace, event=event))
@property def briefing(self) -> str: """Return a briefing of the task including its goal. Returns: str: The briefing of the task. """ data: Dict[str, Any] = self.model_dump() return TEMPLATE_MANAGER.render_template( CONFIG.templates.task_briefing_template, data, )
[docs] def is_running(self) -> bool: """Check if the task is running. Returns: bool: True if the task is running, False otherwise. """ return self._status == TaskStatus.Running
[docs] def is_finished(self) -> bool: """Check if the task is finished. Returns: bool: True if the task is finished, False otherwise. """ return self._status == TaskStatus.Finished
[docs] def is_failed(self) -> bool: """Check if the task is failed. Returns: bool: True if the task is failed, False otherwise. """ return self._status == TaskStatus.Failed
[docs] def is_cancelled(self) -> bool: """Check if the task is cancelled. Returns: bool: True if the task is cancelled, False otherwise. """ return self._status == TaskStatus.Cancelled
[docs] def is_pending(self) -> bool: """Check if the task is pending. Returns: bool: True if the task is pending, False otherwise. """ return self._status == TaskStatus.Pending