Metadata-Version: 2.4
Name: at_common_workflow
Version: 1.5.8
Summary: A robust Python package for building and executing asynchronous workflow tasks in a directed acyclic graph (DAG) pattern. This package provides a type-safe, thread-safe framework for defining, organizing, and running dependent tasks with comprehensive validation and error handling.
Author-email: Rui Chen <apextrader.ai@gmail.com>
Project-URL: Homepage, https://github.com/apex-trader/at-common-workflow
Project-URL: Bug Tracker, https://github.com/apex-trader/at-common-workflow/issues
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pydantic>=2.10.6
Requires-Dist: typing_extensions>=4.12.2
Provides-Extra: dev
Requires-Dist: pytest>=8.3.4; extra == "dev"
Requires-Dist: pytest-asyncio>=0.25.3; extra == "dev"
Dynamic: license-file

# At Common Workflow

## Description
At Common Workflow is a workflow management system that allows users to define and execute tasks in a directed acyclic graph (DAG) structure. It supports parallel execution and provides a context manager for managing task inputs and outputs.

## Installation
To set up the project, follow these steps:

1. Clone the repository:
   ```bash
   git clone <repository-url>
   cd at-common-workflow
   ```
2. Install the dependencies:
   ```bash
   pip install -r requirements.txt
   ```

## Usage
There are two ways to define tasks: using class inheritance or using the builder pattern.

### Class-based Approach
```python
from at_common_workflow.core.task.processing_task import ProcessingTask
from pydantic import BaseModel

class AddInputModel(BaseModel):
    a: int
    b: int

class AddOutputModel(BaseModel):
    result: int

class AddTask(ProcessingTask[AddInputModel, AddOutputModel]):
    def __init__(self, name: str):
        super().__init__(
            name=name,
            input_model=AddInputModel,
            output_model=AddOutputModel,
            processor_function=self._execute
        )
    
    async def _execute(self, input: AddInputModel) -> AddOutputModel:
        return AddOutputModel(result=input.a + input.b)

# Run workflow
from at_common_workflow.core.workflow.base import Workflow

workflow = Workflow()
task = AddTask("add_numbers")
workflow.add_task(task, argument_mappings={"a": 5, "b": 3}, result_mapping="result")
async for event in workflow.execute():
    pass
print(workflow.context.get("result").result)  # Output: 8
```

### Builder Pattern Approach (Recommended)
```python
from at_common_workflow.core.workflow.builder import WorkflowBuilder
from pydantic import BaseModel

class AddInputModel(BaseModel):
    a: int
    b: int

class AddOutputModel(BaseModel):
    result: int

async def execute_add(input: AddInputModel) -> AddOutputModel:
    return AddOutputModel(result=input.a + input.b)

# Create and execute workflow
workflow = (WorkflowBuilder()
    .task("add_numbers")
        .input_model(AddInputModel)
        .output_model(AddOutputModel)
        .processor(execute_add)
        .arg("a", 5)
        .arg("b", 3)
        .output("result")
    .build())

async for event in workflow.execute():
    pass
print(workflow.context.get("result").result)  # Output: 8
```

### Tasks with No Output
For tasks that perform actions but don't need to store their result in the workflow context (like sending notifications or logging), you can call `output()` with no parameters:

```python
workflow = (WorkflowBuilder()
    .task("send_notification")
        .input_model(NotificationInput)
        # output_model is optional when not storing results
        .processor(send_notification)
        .arg("message", "Hello!")
        .arg("recipient", "user@example.com")
        .output()  # Don't store the output
    .build())
```

This will execute the task but won't store its result in the workflow context. This is useful for tasks that have side effects (like sending notifications) but don't produce meaningful output that other tasks need.

For tasks that don't produce meaningful output, you can also skip defining an output model entirely or explicitly set it to None:

```python
# Task function that doesn't produce meaningful output
async def send_notification(input: NotificationInput) -> None:
    print(f"Sending notification to {input.recipient}: {input.message}")
    # Some side effect...
    return None  # or just omit the return statement
```

Note that output models are still required when you need to store results in the workflow context.

### Tasks with No Input

The workflow system allows you to define tasks that don't require any input. This can be useful for tasks that:

- Generate data without external input (like current time, random values, etc.)
- Perform standalone operations not dependent on previous task outputs
- Initialize resources or set up environments

#### Creating Tasks with No Input

When defining a task with no input, you have two options for your processor function:

##### Option 1: Processor with Empty Input Parameter

You can define a processor function that has an `input` parameter, which will receive an empty dictionary (`{}`):

```python
async def processor_with_empty_input(input: dict) -> YourOutputModel:
    # The input parameter is an empty dict
    # Process without input
    return YourOutputModel(...)
```

##### Option 2: Processor with No Parameters

Alternatively, you can define a processor function with no parameters at all:

```python
async def processor_with_no_parameters() -> YourOutputModel:
    # No input parameter
    # Process without input
    return YourOutputModel(...)
```

Both approaches are valid and supported by the framework.

#### Building a Task with No Input

When using the workflow builder, simply omit the `input_model` method call:

```python
workflow = (WorkflowBuilder()
    .task("no_input_task")
        # No input_model call here
        .output_model(YourOutputModel)
        .processor(processor_with_no_parameters)  # or processor_with_empty_input
        .output("result")
    .build()
)
```

#### Example

Here is a complete example of a task with no input that returns the current time:

```python
from pydantic import BaseModel
from at_common_workflow.core.workflow.builder import WorkflowBuilder
import asyncio
from datetime import datetime

# Define the output model
class TimeInfo(BaseModel):
    current_time: str

# Define a processor function with no parameters
async def get_current_time() -> TimeInfo:
    return TimeInfo(
        current_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    )

# Build and run the workflow
async def main():
    workflow = (WorkflowBuilder()
        .task("get_time")
            .output_model(TimeInfo)
            .processor(get_current_time)
            .output("time_info")
        .build()
    )
    
    # Execute workflow
    async for event in workflow.execute():
        pass
    
    # Access the result
    time_info = workflow.context.get("time_info")
    print(f"Current time: {time_info.current_time}")

if __name__ == "__main__":
    asyncio.run(main())
```

#### Notes

- Task functions without input parameters must still be async functions.
- All other task configuration (output model, progress model, etc.) remains the same.
- The framework continues to support legacy code that uses the empty dict approach.

## Examples
Check out the examples directory for more in-depth examples:

- `example1_basic_workflow.py` - Basic workflow setup and execution
- `example2_progress_updates.py` - Using progress updates in tasks
- `example3_parallel_tasks.py` - Running tasks in parallel
- `example4_nested_data.py` - Working with nested data structures
- `example5_error_handling.py` - Handling errors in workflows
- `example6_arithmetic_operations.py` - Performing arithmetic operations
- `example7_no_output.py` - Tasks that don't store their output
- `example8_no_input_parameter.py` - Tasks with no input parameters
