Metadata-Version: 2.4
Name: async-transaction-manager
Version: 0.1.2
Summary: A robust asynchronous transaction management library for Python functions.
Home-page: https://github.com/amirmotlagh/transaction-manager
Author: Amir Alaghmandan
Author-email: amir.alaghmand@gmail.com
Project-URL: Bug Tracker, https://github.com/amirmotlagh/transaction-manager/issues
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Framework :: AsyncIO
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: project-url
Dynamic: requires-python
Dynamic: summary

# Python Async Transaction Manager

A robust asynchronous transaction management library for Python functions, designed to ensure data consistency by automatically rolling back preceding operations if any step in a defined transaction fails. This library is ideal for orchestrating complex workflows that involve multiple discrete, reversible steps.

## Features

*   **Automatic Rollback on Failure**: If any `execute_func` within a transaction fails, all previously completed `execute_func`s are automatically rolled back using their associated `rollback_func`s.
*   **Per-Step Timeout**: Each transaction step can have an optional timeout, preventing long-running or unresponsive operations from holding up the entire transaction.
*   **Configurable Retry Mechanism**: Individual `execute_func` and `rollback_func` calls can be configured with a maximum number of retries and an exponential backoff interval, enhancing resilience against transient failures.
*   **Transaction Hooks**: Users can register custom hooks to execute logic at various stages of the transaction lifecycle (e.g., `on_step_success`, `on_transaction_complete`), enabling flexible integration for monitoring, logging, or custom event handling.
*   **Customizable Logging**: The library uses Python's standard `logging` module, allowing users to configure log levels and handlers to suit their application's logging infrastructure.

## Installation

```bash
pip install transaction-manager
```

## Usage

### Basic Transaction

Here's how to define and run a simple transaction:

```python
import asyncio
from src.transaction_manager import TransactionManager

class UserService:
    async def create_user(self, username, email):
        print(f"Creating user: {username} ({email})")
        # Simulate some work
        await asyncio.sleep(0.1)
        return {"user_id": "user_abc", "username": username, "email": email}

    async def delete_user(self, user_id):
        print(f"Deleting user: {user_id}")
        await asyncio.sleep(0.05)

class VMService:
    async def create_vm(self, vm_name, user_id):
        print(f"Creating VM: {vm_name} for user {user_id}")
        await asyncio.sleep(0.2)
        return {"vm_id": "vm_xyz", "vm_name": vm_name}

    async def delete_vm(self, vm_id):
        print(f"Deleting VM: {vm_id}")
        await asyncio.sleep(0.1)

user_service = UserService()
vm_service = VMService()

async def main():
    async with TransactionManager("create_user_and_vm_transaction", "Provision User and VM") as tx:
        # Step 1: Create user
        user_data = await tx.add_step(
            "create_user_step",
            user_service.create_user,
            rollback_func=user_service.delete_user,
            username="jane.doe",
            email="jane@example.com"
        )
        print(f"User created: {user_data}")

        # Step 2: Create VM, using data from previous step
        vm_data = await tx.add_step(
            "create_vm_step",
            vm_service.create_vm,
            rollback_func=vm_service.delete_vm,
            vm_name="jane-vm",
            user_id=user_data["user_id"]
        )
        print(f"VM created: {vm_data}")

    print("Transaction completed successfully!")
    summary = tx.get_transaction_summary()
    print("Transaction Summary:", summary)

if __name__ == "__main__":
    asyncio.run(main())
```

### Handling Failures and Rollbacks

If a step fails, previous completed steps will be rolled back.

```python
import asyncio
from src.transaction_manager import TransactionManager, TransactionHook
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

class MyUserService:
    async def create_user(self, username):
        print(f"Creating user {username}")
        return {"user_id": f"user_{username}"}

    async def delete_user(self, user_id):
        print(f"Deleting user {user_id}")

class MyPaymentService:
    async def process_payment(self, user_id, amount):
        print(f"Processing payment for {user_id} of {amount}")
        if amount > 100:
            raise ValueError("Payment amount too high!")
        return {"payment_id": "pay_123"}

    async def refund_payment(self, payment_id):
        print(f"Refunding payment {payment_id}")

async def main_failure():
    user_service = MyUserService()
    payment_service = MyPaymentService()

    try:
        async with TransactionManager("payment_transaction", "Process Payment") as tx:
            user_data = await tx.add_step(
                "create_user",
                user_service.create_user,
                rollback_func=user_service.delete_user,
                username="test_user"
            )
            print(f"User created: {user_data['user_id']}")

            payment_data = await tx.add_step(
                "process_payment",
                payment_service.process_payment,
                rollback_func=payment_service.refund_payment,
                user_id=user_data['user_id'],
                amount=200
            )
            print(f"Payment processed: {payment_data['payment_id']}")

    except ValueError as e:
        print(f"Transaction failed with error: {e}")
    finally:
        summary = tx.get_transaction_summary()
        print("Transaction Summary:", summary)

if __name__ == "__main__":
    asyncio.run(main_failure())
```

### Using `rollback_args` and `override_args`

Sometimes, the arguments needed for a rollback function are different from the original execution function, or need to be derived from the execution result.

```python
import asyncio
from src.transaction_manager import TransactionManager
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

class NetworkService:
    async def assign_ip(self, vm_id, ip_address):
        print(f"Assigning IP {ip_address} to VM {vm_id}")
        # In a real scenario, this might return the assigned IP
        return {"assigned_ip": ip_address, "rollback_data": {"ip_to_release": ip_address}}

    async def release_ip(self, ip_to_release):
        print(f"Releasing IP {ip_to_release}")

async def main_rollback_args():
    network_service = NetworkService()

    async with TransactionManager("ip_assignment_transaction") as tx:
        # Assume vm_id is 'vm_abc'
        assigned_ip_data = await tx.add_step(
            "assign_ip",
            network_service.assign_ip,
            rollback_func=network_service.release_ip,
            args=("vm_abc", "192.168.1.100"), # args for assign_ip
        )
        print(f"Assigned IP: {assigned_ip_data['assigned_ip']}")

        # Simulate a failure to trigger rollback
        tx.steps[-1].status = TransactionStepStatus.FAILED
        tx.steps[-1].error = ValueError("Simulated IP assignment failure")

    print("Transaction with rollback args completed.")
    print("Final Status:", tx.get_transaction_summary())

if __name__ == "__main__":
    asyncio.run(main_rollback_args())
```

### Implementing Custom Transaction Hooks

You can create custom hooks to react to different transaction events.

```python
import asyncio
from src.transaction_manager import TransactionManager, TransactionHook
import logging
from typing import Dict, Any

class MyCustomHook(TransactionHook):
    async def on_transaction_start(self, transaction_id: str, operation_name: str):
        logging.info(f"HOOK: Transaction '{operation_name}' ({transaction_id}) starting...")

    async def on_step_success(self, step):
        logging.info(f"HOOK: Step '{step.name}' completed successfully. Result: {step.result}")

    async def on_step_failure(self, step):
        logging.error(f"HOOK: Step '{step.name}' failed with error: {step.error}")

    async def on_rollback_start(self, transaction_id: str):
        logging.warning(f"HOOK: Rollback initiated for transaction {transaction_id}")

    async def on_step_rollback(self, step):
        logging.info(f"HOOK: Step '{step.name}' successfully rolled back.")

    async def on_step_rollback_failure(self, step):
        logging.error(f"HOOK: Step '{step.name}' rollback failed with error: {step.error}")

    async def on_transaction_complete(self, transaction_id: str, success: bool, summary: Dict[str, Any]):
        status = "succeeded" if success else "failed"
        logging.info(f"HOOK: Transaction {transaction_id} {status}. Summary: {summary}")

async def main_hooks():
    # Configure logging to see hook messages
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

    tx_manager = TransactionManager("hook_demo_transaction", "Demo with Hooks")
    tx_manager.register_hook(MyCustomHook())

    # Mock functions
    mock_execute_success = AsyncMock(return_value="data")
    mock_execute_fail = AsyncMock(side_effect=ValueError("Simulated failure"))
    mock_rollback = AsyncMock()

    try:
        async with tx_manager as tx:
            await tx.add_step("StepA", mock_execute_success, mock_rollback)
            await tx.add_step("StepB", mock_execute_fail, mock_rollback)
    except Exception as e:
        logging.error(f"Caught main exception: {e}")

if __name__ == "__main__":
    asyncio.run(main_hooks())
```

### Configuring Logging

The library's internal logger can be configured directly.

```python
import asyncio
import logging
from src.transaction_manager import TransactionManager

async def main_logging_config():
    # Option 1: Configure using basicConfig (affects root logger and potentially others)
    # logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # Option 2: Configure the library's logger specifically
    # By default, TransactionManager uses a NullHandler.
    # To see logs, you need to add a handler.
    
    # Example: Set level to DEBUG and add a StreamHandler
    TransactionManager.configure_logging(level=logging.DEBUG)

    async with TransactionManager("logging_demo") as tx:
        await tx.add_step("LoggedStep", AsyncMock(return_value=True))
        # Simulate a failure to see error logs
        await tx.add_step("FailingLoggedStep", AsyncMock(side_effect=ValueError("Log test failure")))

    print("Logging configuration demo complete.")

if __name__ == "__main__":
    asyncio.run(main_logging_config())
```

## `TransactionStep` Dataclass

```python
@dataclass
class TransactionStep:
    name: str
    execute_func: Callable[..., Awaitable[Any]]
    rollback_func: Optional[Callable[..., Awaitable[Any]]] = None
    execute_args: Any = field(default_factory=tuple) # Positional arguments for execute_func
    execute_kwargs: Any = field(default_factory=dict) # Keyword arguments for execute_func
    status: TransactionStepStatus = TransactionStepStatus.PENDING
    result: Any = None
    error: Optional[Exception] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    rollback_data: Optional[Dict[str, Any]] = None # Data passed to rollback_func as kwargs
    rollback_args: Optional[Tuple] = None # Positional arguments specifically for rollback_func
    override_args: Optional[bool] = False # If True, rollback_args override execute_args for rollback
    operation_name: Optional[str] = None
    timeout: Optional[int] = None # Timeout for this specific step
    max_retries: int = 0 # Max retries for this step
    retry_interval: float = 0.5 # Base interval for exponential backoff retries
```

## `TransactionHook` Interface

Implement this abstract class to create custom hooks:

```python
class TransactionHook(ABC):
    async def on_transaction_start(self, transaction_id: str, operation_name: str): pass
    async def on_step_execute(self, step: "TransactionStep"): pass
    async def on_step_success(self, step: "TransactionStep"): pass
    async def on_step_failure(self, step: "TransactionStep"): pass
    async def on_rollback_start(self, transaction_id: str): pass
    async def on_step_rollback(self, step: "TransactionStep"): pass
    async def on_step_rollback_failure(self, step: "TransactionStep"): pass
    async def on_transaction_complete(self, transaction_id: str, success: bool, summary: Dict[str, Any]): pass
```
