Metadata-Version: 2.4
Name: oteilo
Version: 0.1.0
Summary: A collection of intuitive Python utilities for common development tasks
Project-URL: Repository, https://github.com/teilomillet/oteilo
Author: Teilo Millet
License-File: LICENSE
Keywords: io,logging,timing,utilities,validation
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.12
Requires-Dist: loguru>=0.7.3
Requires-Dist: pillow>=11.1.0
Requires-Dist: pytest-cov>=6.0.0
Requires-Dist: pytest>=8.3.4
Description-Content-Type: text/markdown

# Oteilo

A robust Python toolkit that helps your team write safer, more maintainable code with built-in monitoring and validation. Think of it as a safety net that catches errors early and provides clear insights into your application's behavior.

## Why Oteilo?

- **Catch Problems Early**: Automatically validates data types and catches errors before they reach production
- **Clear Visibility**: Built-in monitoring shows exactly what your code is doing and how long it takes
- **Maintainable Code**: Enforces clean, consistent patterns that make code easier to understand and modify
- **Reduced Risk**: Strong type checking and validation prevent data-related bugs
- **Developer Productivity**: Common patterns are pre-built, tested, and ready to use

## Quick Start

```bash
pip install oteilo
```

## Understanding Imports

Oteilo is organized into focused modules, each handling specific concerns. Here's how to import and use them:

### Core Pipeline Components
```python
from oteilo.pipeline import Pipeline, Step
from typing import Dict, Any  # Python's typing for type hints

# Use Pipeline when you need to:
# - Chain multiple operations together
# - Validate data types between steps
# - Process data through a sequence of transformations
pipeline = Pipeline().then(step1).then(step2)

# Use Step when you need to:
# - Create reusable, configurable pipeline steps
# - Add automatic type validation
# - Provide clear documentation and metadata
class MyStep(Step):
    def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
        return transformed_data
```

### Monitoring and Debugging
```python
from oteilo.functional import trace  # For function monitoring
from oteilo.timing import timing_block  # For performance tracking
from oteilo.logging import configure_logging  # For setting up logging

# Use @trace when you need to:
# - Monitor function execution
# - Track timing and performance
# - Log input/output data
@trace(level="DEBUG")
def my_function(): ...

# Use timing_block when you need to:
# - Measure specific code blocks
# - Track nested operations
# - Profile performance
with timing_block("operation_name"):
    do_something()
```

### Error Handling
```python
from oteilo.pipeline import PipelineError  # For catching pipeline-specific errors

# Use PipelineError when you need to:
# - Distinguish pipeline errors from other exceptions
# - Access detailed error information
# - Handle different types of validation failures
try:
    result = pipeline.process(data)
except PipelineError as e:
    print(f"Error details: {e.details}")
    print(f"Original cause: {e.cause}")
```

### Type Validation
```python
from oteilo.validation import validate_type  # For manual type checking
from typing import Dict, List, Any, Optional  # Python's type hints

# Use validate_type when you need to:
# - Check types manually
# - Handle complex type validation
# - Support Union and Optional types
if not validate_type(data, Dict[str, Any]):
    raise ValueError("Expected dictionary")
```

### Complete Example
```python
from oteilo.pipeline import Pipeline, Step, PipelineError
from oteilo.functional import trace
from oteilo.timing import timing_block
from oteilo.logging import configure_logging
from oteilo.validation import validate_type
from typing import Dict, Any, Optional

# Set up logging first
configure_logging(level="DEBUG")

# Create monitored functions
@trace
def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
    with timing_block("processing"):
        # Your processing logic here
        return processed_data

# Create pipeline steps
class ValidationStep(Step):
    def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:
        # Validation logic here
        return validated_data

# Build and use pipeline
pipeline = Pipeline().then(ValidationStep()).then(process_data)

# Handle errors appropriately
def handle_request(data: Dict[str, Any]) -> Dict[str, Any]:
    try:
        return pipeline.process(data)
    except PipelineError as e:
        # Handle pipeline-specific errors
        return {"error": str(e), "details": e.details}
    except Exception as e:
        # Handle unexpected errors
        return {"error": "Unexpected error", "details": str(e)}
```

## Key Features (With Real-World Examples)

### 1. Automatic Function Monitoring

Monitor any function to see what's happening inside your application. The `@trace` decorator provides both timing and comprehensive monitoring:

```python
from oteilo.functional import trace

# 1. Simple timing only (default)
@trace
def calculate_total(order):
    return order["quantity"] * order["price"]

# Output:
# DEBUG: calculate_total took 0.001 seconds

# 2. Full tracing with timing and monitoring
@trace(level="DEBUG")
def calculate_order_total(order):
    return order["quantity"] * order["price"]

# Output:
# DEBUG: calculate_order_total called with {'order': {'quantity': 5, 'price': 9.99}}
# DEBUG: calculate_order_total type information {'params': {'order': 'Dict[str, Any]'}, 'return': 'float'}
# DEBUG: Executing calculate_order_total
# DEBUG: calculate_order_total took 0.001 seconds
# DEBUG: calculate_order_total completed with result: 49.95

# 3. Custom configuration
@trace(
    level="INFO",      # Set logging level
    timing=True,       # Include timing (default)
    args=True,         # Log arguments (default)
    types=True,        # Log type information (default)
    nested=False       # Don't trace nested calls
)
def process_order(order):
    validate_order(order)
    total = calculate_total(order)
    return format_output(total)

# You can also combine @trace with timing_block for more granular timing:
@trace(level="DEBUG")
def process_large_order(order):
    with timing_block("validation"):
        validate_order(order)
    
    with timing_block("calculation"):
        total = calculate_total(order)
    
    with timing_block("formatting"):
        return format_output(total)

# Output:
# DEBUG: process_large_order called with {'order': {...}}
# DEBUG: Executing process_large_order
# DEBUG: validation took 0.001 seconds
# DEBUG: calculation took 0.002 seconds
# DEBUG: formatting took 0.001 seconds
# DEBUG: process_large_order took 0.004 seconds
# DEBUG: process_large_order completed with result: {...}
```

**Business Value**: 
- Instantly see performance bottlenecks with automatic timing
- Debug issues with detailed logging of inputs and outputs
- Monitor critical business functions with type validation
- Track nested operations and their timing
- Flexible configuration to match your monitoring needs

### 2. Safe Data Processing Pipeline

Build reliable data transformation workflows that validate every step:

```python
from oteilo.pipeline import Pipeline
from typing import Dict, Any

# Define each step of your process
def validate_order(order):
    """Make sure we have all required order data."""
    if not order.get("customer_id"):
        raise ValueError("Missing customer ID")
    return order

def calculate_total(order):
    """Calculate order total with tax."""
    subtotal = order["quantity"] * order["price"]
    tax = subtotal * 0.1
    return {**order, "subtotal": subtotal, "tax": tax, "total": subtotal + tax}

def format_currency(order):
    """Format money values for display."""
    return {
        **order,
        "subtotal": f"${order['subtotal']:.2f}",
        "tax": f"${order['tax']:.2f}",
        "total": f"${order['total']:.2f}"
    }

# 1. First, define your pipeline - this validates type compatibility
order_pipeline = (Pipeline()
    .then(validate_order)      # Dict -> Dict
    .then(calculate_total)     # Dict -> Dict
    .then(format_currency))    # Dict -> Dict

# 2. Then use it to process data - this is where runtime checks happen
def process_order(order_data: Dict[str, Any]) -> Dict[str, Any]:
    """Process an order with full validation."""
    # Type validation and business rules are checked during processing
    return order_pipeline.process(order_data)

# 3. Now you can handle errors at your application level
def handle_order_request(request):
    try:
        order_data = {
            "customer_id": "12345",
            "quantity": 5,
            "price": 9.99
        }
        result = process_order(order_data)
        return {"status": "success", "data": result}
        
    except PipelineError as e:
        if "Invalid input type" in str(e):
            return {"status": "error", "message": "Invalid data format"}
        if "quantity must be positive" in str(e):
            return {"status": "error", "message": "Invalid quantity"}
        return {"status": "error", "message": str(e)}

# This is cleaner than mixing pipeline usage and error handling
response = handle_order_request(request)
if response["status"] == "success":
    send_confirmation(response["data"])
else:
    notify_user(response["message"])
```

**Business Value**:
- Complete pipeline validation before any data processing
- Immediate feedback on type mismatches in the entire chain
- Prevents wasted resources on invalid pipelines
- Catches configuration errors during development

### 3. Error Prevention at Multiple Levels

Oteilo prevents errors at three distinct stages:

1. **Construction Time** (When building your pipeline):
```python
from oteilo.pipeline import Pipeline, Step
from typing import Dict, Any

# This will fail immediately due to type mismatch
try:
    pipeline = (Pipeline()
        .then(lambda x: str(x))     # Output: str
        .then(lambda x: x * 2))     # Expects int, not str
except PipelineError as e:
    print("Pipeline construction failed: incompatible steps")

# This will fail due to missing type hints
try:
    pipeline = Pipeline().then(lambda x: x * 2)  # No type hints
except PipelineError as e:
    print("Pipeline construction failed: missing type information")
```

2. **Runtime Validation** (When processing data):
```python
class PriceCalculator(Step):
    """Calculate prices with built-in validation."""
    
    def calculate(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Calculate final price with discounts."""
        # Types are checked automatically
        base_price = data["price"]
        quantity = data["quantity"]
        discount = data.get("discount", 0)
        
        total = (base_price * quantity) * (1 - discount)
        return {"total": total}

calculator = PriceCalculator()

# Type validation happens automatically
result = calculator.calculate({
    "price": 99.99,
    "quantity": 5,
    "discount": 0.1
})

# Invalid types are caught immediately
try:
    result = calculator.calculate({
        "price": "not a number",  # Wrong type
        "quantity": 5
    })
except PipelineError as e:
    print("Validation failed: incorrect data type")
```

3. **Business Logic Validation** (Your custom rules):
```python
def validate_order(order: Dict[str, Any]) -> Dict[str, Any]:
    """Validate business rules for orders."""
    if order["quantity"] <= 0:
        raise ValueError("Quantity must be positive")
    if order["price"] <= 0:
        raise ValueError("Price must be positive")
    return order

# Business rules are checked during processing
pipeline = Pipeline().then(validate_order)
try:
    result = pipeline.process({
        "quantity": -1,
        "price": 99.99
    })
except PipelineError as e:
    print("Business rule violated: quantity must be positive")
```

**Business Value**:
- Configuration errors are caught during development
- Data validation errors are caught before processing
- Business rule violations are clearly reported
- Each type of error has its own clear message

### 4. Performance Monitoring

Track performance of critical operations:

```python
from oteilo.timing import timing_block

def process_customer_data(customers):
    with timing_block("customer_processing"):
        for customer in customers:
            with timing_block(f"processing_{customer['id']}"):
                update_customer(customer)
                process_orders(customer)
                send_notification(customer)

# You'll see exactly how long each operation takes:
# DEBUG: customer_processing took 1.234 seconds
# DEBUG: processing_12345 took 0.123 seconds
# DEBUG: processing_12346 took 0.456 seconds
```

**Business Value**:
- Identify slow operations
- Track processing times
- Optimize performance bottlenecks

## Real-World Use Cases

### 1. Order Processing System

```python
from oteilo.pipeline import Pipeline
from oteilo.functional import trace

# Monitor critical business functions
@trace(level="INFO")
def validate_inventory(order):
    """Check if we have enough inventory."""
    # Your inventory check logic here
    return order

@trace(level="INFO")
def process_payment(order):
    """Process payment and return confirmation."""
    # Your payment processing logic here
    return {**order, "payment_status": "confirmed"}

# Build a reliable order pipeline
order_system = (Pipeline()
    .then(validate_order)       # Step 1: Validate order data
    .then(validate_inventory)   # Step 2: Check inventory
    .then(calculate_total)      # Step 3: Calculate prices
    .then(process_payment)      # Step 4: Process payment
    .then(format_currency))     # Step 5: Format for display

# Process orders safely
try:
    result = order_system.process({
        "customer_id": "12345",
        "product_id": "PROD-1",
        "quantity": 5
    })
    print("Order processed successfully")
except Exception as e:
    print(f"Order failed: {e}")  # Clear error message
```

### 2. Data Transformation Pipeline

```python
from oteilo.pipeline import Pipeline, Step
from typing import Dict, Any

class DataCleaner(Step):
    """Clean and validate customer data."""
    
    def clean(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Remove invalid characters and normalize format."""
        return {
            "name": data["name"].strip().title(),
            "email": data["email"].lower().strip(),
            "phone": self.format_phone(data.get("phone", ""))
        }

class DataEnricher(Step):
    """Add additional customer information."""
    
    def enrich(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Add derived data fields."""
        return {
            **data,
            "customer_segment": self.calculate_segment(data),
            "lifetime_value": self.calculate_ltv(data)
        }

# Create a data processing pipeline
data_pipeline = (Pipeline()
    .then(DataCleaner())      # Step 1: Clean data
    .then(DataEnricher())     # Step 2: Add business data
    .then(format_output))     # Step 3: Format for use

# Process customer data safely
try:
    clean_data = data_pipeline.process({
        "name": "john smith ",
        "email": " John@Example.COM ",
        "phone": "1234567890"
    })
    print("Data processed successfully")
except Exception as e:
    print(f"Data processing failed: {e}")
```

## Requirements

- Python 3.12 or higher
- Additional packages are automatically installed

## Support

Need help? Have questions? Contact our support team at [support email/link].

## Contributing

We welcome contributions! See our [Contributing Guide](CONTRIBUTING.md) for details.

## License

MIT License - Feel free to use in your projects.