Metadata-Version: 2.4
Name: zuma-workflow
Version: 0.0.1b1
Summary: A powerful, lightweight and flexible workflow management framework for Python
Project-URL: Documentation, https://gitlab.com/codejunction/zuma#readme
Project-URL: Homepage, https://gitlab.com/codejunction/zuma
Project-URL: Issues, https://gitlab.com/codejunction/zuma/issues
Project-URL: Repository, https://gitlab.com/codejunction/zuma.git
Author-email: phoenixd <elsecube@codejunction.dev>
License-Expression: GPL-3.0
License-File: LICENSE
Keywords: async,parallel,pipeline,task,workflow
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.11
Requires-Dist: loguru>=0.7.3
Requires-Dist: pydantic>=2.0.0
Requires-Dist: typing-extensions>=4.8.0
Description-Content-Type: text/markdown

# Zuma - Workflow Manager and Executor

Zuma is a powerful, asynchronous workflow management and execution system built in Python. It provides a flexible framework for defining, managing, and executing complex workflows with parallel processing capabilities.

## Objective

The main objective of Zuma is to provide a robust framework for:

- Defining and executing complex workflows with multiple steps
- Managing parallel task execution with concurrency control
- Handling dependencies between workflow components
- Providing detailed execution tracking and error handling
- Supporting conditional workflow paths

## Features

### 1. Core Components

- **ZumaWorkflow**: Main container for organizing workflow steps
  - Supports sequential and parallel execution
  - Handles step dependencies and execution order
  - Configurable failure behavior with `continue_on_failure`
- **ZumaActionStep**: Basic unit of work execution
  - Configurable retries and timeouts
  - Custom execution logic through overridable methods
  - Automatic context and dependency injection
- **ZumaParallelAction**: Handles concurrent execution of multiple steps
  - Configurable concurrency limits with `max_concurrency`
  - Fail-fast option for error handling
  - Automatic resource management
- **ZumaConditionalStep**: Supports branching logic in workflows
  - Dynamic path selection based on context
  - Optional else-branch handling
  - Context-aware condition evaluation
- **ZumaRunner**: Orchestrates workflow execution
  - Manages workflow lifecycle
  - Handles execution context
  - Provides execution summaries and results

### 2. Execution Management

- Asynchronous execution using Python's asyncio
  - Non-blocking operation execution
  - Efficient resource utilization
  - Concurrent task handling
- Configurable concurrency limits for parallel actions
- Execution context management and logging
- Progress tracking and status reporting
- Detailed execution results and metrics

### 3. Error Handling

- Comprehensive error tracking and reporting
  - Detailed error messages with stack traces
  - Component-specific error context
  - Error categorization
- Configurable retry mechanisms
  - Per-step retry configuration
  - Exponential backoff support
  - Maximum retry limits
- Fail-fast and continue-on-failure options
- Validation of workflow configurations

### 4. Monitoring and Reporting

- Detailed execution summaries
- Duration tracking for steps
- Status tracking (PENDING, RUNNING, SUCCESS, FAILED, etc.)
- JSON-formatted execution results

### 5. Flexibility

- Plugin system for extending functionality
- Custom action step implementation
- Configurable execution parameters
- Support for complex workflow patterns

### 6. Workflow Visualization

- Mermaid diagram generation for workflow visualization
  - Clear visualization of workflow steps and their relationships
  - Support for retry mechanisms visualization
  - Visual representation of parallel processing
  - Automatic diagram generation during workflow execution
  - Dark theme support for better readability

## Usage Documentation

### Basic Usage

#### 1. Simple Sequential Workflow

```python
from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner
from typing import Dict, Any
import asyncio

class DataFetchStep(ZumaActionStep):
    """Step that simulates fetching data from a source"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        print(f"[{self.name}] Fetching data...")
        await asyncio.sleep(1)  # Simulate network delay
        return {"data": "fetched_data_123"}

class ProcessingStep(ZumaActionStep):
    """Step that processes the fetched data"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        data = context.get("data")
        print(f"[{self.name}] Processing data: {data}")
        await asyncio.sleep(0.5)  # Simulate processing
        return {"processed_data": f"processed_{data}"}

async def run_simple_workflow():
    workflow = ZumaWorkflow(
        "Simple Sequential Workflow",
        steps=[
            DataFetchStep("Fetch Data"),
            ProcessingStep("Process Data")
        ]
    )

    runner = ZumaRunner()
    result = await runner.run_workflow(workflow)
    runner.print_execution_summary(result)
    return result

if __name__ == "__main__":
    asyncio.run(run_simple_workflow())
```

#### 2. Parallel Processing

```python
from zuma import ZumaWorkflow, ZumaActionStep, ZumaParallelAction, ZumaRunner
from typing import Dict, Any
import asyncio

class DataProcessStep(ZumaActionStep):
    """Processes a single file type"""
    def __init__(self, name: str, file_type: str):
        super().__init__(name)
        self.file_type = file_type

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        valid_files = context.get("valid_files", [])
        our_files = [f for f in valid_files if f.endswith(self.file_type)]

        results = []
        for file in our_files:
            print(f"[{self.name}] Processing {file}...")
            await asyncio.sleep(0.5)  # Simulate processing
            results.append({
                "file": file,
                "processor": self.name,
                "status": "completed"
            })

        return {"processed_files": len(results), "results": results}

async def run_parallel_workflow():
    # Define parallel data processing
    parallel_processing = ZumaParallelAction(
        "Parallel Processing",
        steps=[
            DataProcessStep("Process CSV", ".csv"),
            DataProcessStep("Process JSON", ".json"),
            DataProcessStep("Process XML", ".xml")
        ],
        max_concurrency=2  # Process 2 file types at a time
    )

    # Create workflow
    workflow = ZumaWorkflow(
        "Parallel Processing Workflow",
        steps=[parallel_processing]
    )

    # Run workflow with sample data
    initial_context = {
        "valid_files": [
            "data1.csv", "data2.json", "data3.xml",
            "data4.csv", "data5.json"
        ]
    }

    runner = ZumaRunner()
    result = await runner.run_workflow(workflow, context=initial_context)
    runner.print_execution_summary(result)
    return result

if __name__ == "__main__":
    asyncio.run(run_parallel_workflow())
```

#### 3. Error Handling

```python
from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner, ZumaExecutionError
from typing import Dict, Any
import asyncio

class ValidatingStep(ZumaActionStep):
    """Step that validates input data"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        data = context.get("data")
        print(f"[{self.name}] Validating data: {data}")

        if not self.validate_data(data):
            raise ZumaExecutionError("Data validation failed")

        return {"validated": True, "data": data}

    def validate_data(self, data):
        return isinstance(data, str) and len(data) > 0

async def run_error_workflow():
    workflow = ZumaWorkflow(
        "Error Handling Workflow",
        steps=[
            ValidatingStep(
                "Validate Data",
                description="Validates input data format",
                retries=3
            )
        ],
        continue_on_failure=True
    )

    # Run workflow with sample data
    initial_context = {
        "data": "sample_data_123"  # Valid data
        # "data": None  # Invalid data to trigger validation error
    }

    runner = ZumaRunner()
    result = await runner.run_workflow(workflow, context=initial_context)
    runner.print_execution_summary(result)
    return result

if __name__ == "__main__":
    asyncio.run(run_error_workflow())
```

#### 4. Conditional Workflow

```python
from zuma import ZumaWorkflow, ZumaActionStep, ZumaConditionalStep, ZumaRunner
from typing import Dict, Any
import asyncio

def check_data_size(context: Dict[str, Any]) -> bool:
    """Determines processing path based on data size"""
    return context.get("data_size", 0) > 1000

class BatchProcessingStep(ZumaActionStep):
    """Handles large dataset processing in batches"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        data = context.get("data", [])
        batch_size = 100

        print(f"[{self.name}] Processing {len(data)} items in batches...")
        processed = []

        for i in range(0, len(data), batch_size):
            batch = data[i:i + batch_size]
            await asyncio.sleep(0.1)  # Simulate batch processing
            processed.extend([f"processed_{item}" for item in batch])

        return {
            "processed_items": len(processed),
            "processing_type": "batch",
            "results": processed
        }

async def run_conditional_workflow():
    workflow = ZumaWorkflow(
        "Conditional Processing Workflow",
        steps=[
            ZumaConditionalStep(
                "Processing Path Decision",
                condition=check_data_size,
                true_component=BatchProcessingStep("Batch Process"),
                false_component=SimpleProcessingStep("Simple Process")
            )
        ]
    )

    # Run workflow with sample data
    initial_context = {
        "data": list(range(2000)),  # Large dataset to trigger batch processing
        "data_size": 2000
    }

    runner = ZumaRunner()
    result = await runner.run_workflow(workflow, context=initial_context)
    runner.print_execution_summary(result)
    return result

if __name__ == "__main__":
    asyncio.run(run_conditional_workflow())
```

### Complex Scenarios

#### 1. Parallel Processing with Dependencies

```python
from zuma import ZumaWorkflow, ZumaActionStep, ZumaParallelAction, ZumaRunner
from typing import Dict, Any
import asyncio
import random

class DataProcessStep(ZumaActionStep):
    """Processes a single file"""
    def __init__(self, name: str, file_type: str):
        super().__init__(name)
        self.file_type = file_type

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        valid_files = context.get("valid_files", [])
        our_files = [f for f in valid_files if f.endswith(self.file_type)]

        results = []
        for file in our_files:
            print(f"[{self.name}] Processing {file}...")
            await asyncio.sleep(0.5)  # Simulate processing
            results.append({
                "file": file,
                "processor": self.name
            })

        return {
            "processed_files": len(results),
            "results": results
        }

# Define parallel data processing
parallel_processing = ZumaParallelAction(
    "Parallel Processing",
    steps=[
        DataProcessStep("Process CSV", file_type=".csv"),
        DataProcessStep("Process JSON", file_type=".json"),
        DataProcessStep("Process XML", file_type=".xml")
    ],
    max_concurrency=2  # Process 2 file types at a time
)
```

#### 2. Conditional Branching with State Management

```python
from zuma import ZumaWorkflow, ZumaActionStep, ZumaConditionalStep
from typing import Dict, Any
import asyncio

def check_data_size(context: Dict[str, Any]) -> bool:
    """Determines processing path based on data size"""
    return context.get("data_size", 0) > 1000

class BatchProcessingStep(ZumaActionStep):
    """Handles large dataset processing in batches"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        data = context.get("data", [])
        batch_size = 100

        print(f"[{self.name}] Processing {len(data)} items in batches...")
        processed = []

        for i in range(0, len(data), batch_size):
            batch = data[i:i + batch_size]
            await asyncio.sleep(0.1)  # Simulate batch processing
            processed.extend([f"processed_{item}" for item in batch])

        return {
            "processed_items": len(processed),
            "processing_type": "batch",
            "results": processed
        }

# Create workflow with conditional branching
workflow = ZumaWorkflow(
    "Conditional Processing Workflow",
    steps=[
        DataLoadStep("Load Data"),
        ZumaConditionalStep(
            "Processing Path Decision",
            condition=check_data_size,
            true_component=BatchProcessingStep("Batch Process"),
            false_component=SimpleProcessingStep("Simple Process")
        ),
        ResultSaveStep("Save Results")
    ]
)
```

#### 3. Dynamic Workflow Generation

```python
def create_dynamic_workflow(config: Dict[str, Any]) -> ZumaWorkflow:
    steps = []

    # Add input validation
    if config.get("validate_input"):
        steps.append(ValidationStep("Input Validation"))

    # Add processing steps based on config
    for process in config.get("processes", []):
        steps.append(ProcessingStep(f"Process {process}", process_type=process))

    # Add parallel processing if needed
    if config.get("parallel_processing"):
        parallel_steps = [
            ProcessingStep(f"Parallel {i}", worker_id=i)
            for i in range(config["worker_count"])
        ]
        steps.append(ZumaParallelAction("Parallel Processing", steps=parallel_steps))

    return ZumaWorkflow("Dynamic Workflow", steps=steps)
```

### Customizations

#### 1. Custom Action Step with Progress Tracking

```python
from loguru import logger

class ProgressTrackingStep(ZumaActionStep):
    def __init__(self, name: str, total_items: int):
        super().__init__(name)
        self.total_items = total_items
        self.processed = 0

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        for i in range(self.total_items):
            await self.process_item(i)
            self.processed += 1
            self.report_progress()
        return {"processed_count": self.processed}

    def report_progress(self):
        percentage = (self.processed / self.total_items) * 100
        logger.info(f"{self.name}: {percentage:.2f}% complete")

    async def process_item(self, item):
        # Implementation specific processing
        pass
```

#### 2. Custom Result Handler

```python
class CustomResultHandler:
    def __init__(self, workflow_name: str):
        self.workflow_name = workflow_name
        self.results = []

    def handle_result(self, result: ZumaResult):
        # Process the result
        self.results.append({
            "step": result.name,
            "status": result.status.value,
            "duration": result.duration,
            "error": result.error
        })

    def generate_report(self):
        return {
            "workflow_name": self.workflow_name,
            "total_steps": len(self.results),
            "successful_steps": sum(1 for r in self.results if r["status"] == "SUCCESS"),
            "failed_steps": sum(1 for r in self.results if r["status"] == "FAILED"),
            "total_duration": sum(r["duration"] for r in self.results if r["duration"]),
            "step_results": self.results
        }

# Usage with custom handler
async def run_with_custom_handler():
    workflow = create_workflow()
    runner = ZumaRunner()
    handler = CustomResultHandler(workflow.name)

    result = await runner.run_workflow(workflow)
    handler.handle_result(result)
    report = handler.generate_report()
    return report
```

#### 3. Custom Context Manager

```python
class WorkflowContext:
    def __init__(self):
        self.start_time = None
        self.metrics = {}
        self.state = {}

    def track_metric(self, name: str, value: float):
        if name not in self.metrics:
            self.metrics[name] = []
        self.metrics[name].append(value)

    def get_average_metric(self, name: str) -> float:
        values = self.metrics.get(name, [])
        return sum(values) / len(values) if values else 0

    def update_state(self, key: str, value: Any):
        self.state[key] = value

    def get_state(self, key: str, default: Any = None) -> Any:
        return self.state.get(key, default)

# Usage with custom context
async def run_with_custom_context():
    context = WorkflowContext()
    workflow = create_workflow()
    runner = ZumaRunner()

    result = await runner.run_workflow(
        workflow,
        context={"custom_context": context}
    )
    return result
```

## Advanced Usage Examples

### 1. Parallel Execution with Error Handling

```python
from zuma import ZumaWorkflow, ZumaParallelAction, ZumaActionStep

# Create parallel workflow with error handling
workflow = ZumaWorkflow(
    "Parallel Processing",
    steps=[
        ZumaParallelAction(
            "Data Processing",
            steps=[
                ZumaActionStep("Process1", retries=3),
                ZumaActionStep("Process2", timeout=30.0),
                ZumaActionStep("Process3"),
            ],
            fail_fast=True,
            max_concurrency=2
        )
    ],
    continue_on_failure=False
)
```

### 2. Conditional Workflow

```python
from zuma import ZumaWorkflow, ZumaConditionalStep

def check_condition(context):
    return context.get('value', 0) > 100

workflow = ZumaWorkflow(
    "Conditional Flow",
    steps=[
        ZumaConditionalStep(
            "Value Check",
            condition=check_condition,
            true_component=ZumaActionStep("High Value Process"),
            false_component=ZumaActionStep("Low Value Process")
        )
    ]
)
```

### 3. Custom Action Step

```python
from zuma import ZumaActionStep

class DataValidationStep(ZumaActionStep):
    async def execute(self, context: Dict[str, Any], **kwargs) -> Dict[str, Any]:
        data = context.get('input_data')
        if not self.validate_data(data):
            raise ZumaExecutionError("Invalid data format")
        return {"validation_passed": True}

    def validate_data(self, data):
        # Custom validation logic
        return True
```

## Technical Architecture

### Component Hierarchy

```
ZumaComponent (Abstract Base)
├── ZumaWorkflow
├── ZumaActionStep
├── ZumaParallelAction
└── ZumaConditionalStep
```

### Execution Flow

1. **Initialization**

   - Workflow and step validation
   - Context preparation
   - Dependency injection

2. **Execution**

   - Async task creation
   - Parallel execution management
   - Status tracking
   - Error handling

3. **Completion**
   - Result aggregation
   - Cleanup
   - Summary generation

## Current Limitations

1. **Persistence**

   - No built-in persistence for workflow state
   - Execution history is not stored between runs
   - No recovery mechanism for failed workflows

2. **Monitoring**

   - Limited real-time monitoring capabilities
   - No built-in visualization tools
   - Basic console-based progress reporting

3. **Scalability**

   - Limited to single process execution
   - No distributed execution support
   - Memory constraints for large workflows

4. **Error Recovery**
   - Basic retry mechanism
   - No sophisticated failure recovery strategies
   - Limited rollback capabilities

## Room for Improvement

### 1. Architecture Enhancements

- Implement persistent storage for workflow state
- Add support for distributed execution
- Develop a proper plugin architecture
- Implement workflow versioning

### 2. Execution Features

- Add support for workflow checkpointing
- Implement sophisticated retry strategies
- Add transaction support for atomic operations
- Develop workflow templating system

### 3. Monitoring and Observability

- Add real-time monitoring dashboard
- Implement metrics collection
- Add workflow visualization tools
- Enhance logging and debugging capabilities

### 4. Error Handling

- Implement advanced error recovery strategies
- Add support for compensating transactions
- Develop workflow replay capabilities
- Add debugging and troubleshooting tools

### 5. Integration and Extensions

- Add REST API interface
- Develop language-agnostic workflow definitions
- Add support for external task queues
- Implement event-driven workflow triggers

### 6. Documentation and Testing

- Expand test coverage
- Add performance benchmarks
- Improve documentation with more examples
- Create user guides and tutorials

## Getting Started

```python
from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner

# Define a simple workflow
workflow = ZumaWorkflow(
    "Sample Workflow",
    steps=[
        ZumaActionStep("Step1"),
        ZumaActionStep("Step2"),
    ]
)

# Create a runner and execute
runner = ZumaRunner()
result = await runner.run_workflow(workflow)
```

## Examples

The `examples/` directory contains various examples demonstrating Zuma's features:

1. `simple.py` - Basic sequential workflow demonstration
2. `retry_mechanism.py` - Advanced retry handling with visualization
3. `parallel_processing.py` - Concurrent task execution
4. `workflow_in_workflow.py` - Nested workflow composition
5. `conditional_workflow.py` - Dynamic branching based on conditions
6. `error_handling.py` - Comprehensive error handling patterns
7. `dynamic_workflow.py` - Runtime workflow modification
8. `custom_actions.py` - Creating custom action steps
9. `workflow_composition.py` - Complex workflow composition

### Visualization Example

```python
from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner
import asyncio

class UnreliableAPIStep(ZumaActionStep):
    def __init__(self, name: str):
        super().__init__(
            name=name,
            retries=3,  # Try up to 3 times
            retry_delay=1.0  # Wait 1 second between retries
        )

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        # Simulated API call that might fail
        if random.random() < 0.7:  # 70% chance of failure
            raise ZumaExecutionError("API request failed")
        return {"api_response": "success"}

async def run_retry_example():
    workflow = ZumaWorkflow(
        "Retry Mechanism Demo",
        steps=[UnreliableAPIStep("API Call")]
    )

    runner = ZumaRunner()
    # Generate visualization diagram
    result = await runner.run_workflow(
        workflow,
        generate_diagram=True,
        diagram_output="retry_mechanism"
    )
    return result

if __name__ == "__main__":
    asyncio.run(run_retry_example())
```

The generated diagram will show:
- Main workflow path
- Retry attempts with failure paths
- Success paths back to main flow
- Error handling mechanism

## Installation

```bash
pip install zuma-workflow
```

## Requirements

- Python 3.8+
- asyncio
- typing-extensions

## Documentation

For detailed documentation, examples, and API reference, visit:
[https://zuma.codejunction.dev](https://zuma.codejunction.dev)

## Contributing

[Add Contributing Guidelines]

## Project Status

Zuma is currently in active development. While it's stable for production use, some advanced features are still being developed. Contributions and feedback are welcome!
