Metadata-Version: 2.4
Name: decentral-service
Version: 1.2.0
Summary: A Redis-like edge microservice for multi-threaded task processing
Home-page: https://github.com/your-org/decentral-service
Author: DecentralService Team
Author-email: Manh Thang Ho <homata123@gmail.com>
License: MIT
Project-URL: Homepage, https://github.com/homata123/DecentralService
Project-URL: Repository, https://github.com/homata123/DecentralService
Project-URL: Issues, https://github.com/homata123/DecentralService/issues
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Classifier: Topic :: Internet :: WWW/HTTP :: HTTP Servers
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: flask>=3.1.2
Requires-Dist: flask-cors>=6.0.1
Requires-Dist: psutil>=7.0.0
Requires-Dist: requests>=2.31.0
Provides-Extra: dev
Requires-Dist: pytest>=6.0; extra == "dev"
Requires-Dist: pytest-cov>=2.0; extra == "dev"
Requires-Dist: black>=21.0; extra == "dev"
Requires-Dist: flake8>=3.8; extra == "dev"
Requires-Dist: mypy>=0.800; extra == "dev"
Dynamic: author
Dynamic: home-page
Dynamic: requires-python

# DecentralService

A Redis-like edge microservice for multi-threaded task processing, built with Python. DecentralService provides in-memory storage, task queuing, and multi-threaded processing capabilities that can be used as a standalone service or imported as a library.

## Features

- **Redis-like Storage**: Key-value storage with TTL support
- **Multi-threaded Task Processing**: Configurable worker pool for concurrent task execution
- **Priority Queue**: Task prioritization with heap-based queue implementation
- **HTTP API**: RESTful API for external access
- **Thread-safe Operations**: All operations are thread-safe for concurrent access
- **Task Handlers**: Pluggable task processing system
- **Statistics & Monitoring**: Comprehensive metrics and status reporting
- **Easy Integration**: Can be used as a library or standalone service
- **Custom Task IDs**: Submit tasks with custom identifiers
- **Overwrite Control**: Control behavior when duplicate task IDs are encountered
- **Task Search & Filtering**: Search and list tasks with pagination
- **FastAPI Integration**: Complete example with FastAPI integration

## Installation

1. Clone the repository:
```bash
git clone <repository-url>
cd DecentralService
```

2. Install dependencies:
```bash
pip install -r requirements.txt
```

3. Install the package:
```bash
pip install -e .
```

## Quick Start

### As a Library

```python
from decentral_service import DecentralService

# Create service instance
service = DecentralService(max_workers=4, enable_api=True)

# Use as context manager (recommended)
with service:
    # Storage operations
    service.set("key1", "value1")
    service.set("key2", {"data": "complex"}, ttl=300)  # 5 minutes TTL
    
    value = service.get("key1")
    print(f"Retrieved: {value}")
    
    # Task processing
    task_id = service.submit_task("echo", {"message": "Hello World!"})
    print(f"Task submitted: {task_id}")
```

### As a Standalone Service

```python
from decentral_service import DecentralService

# Start service with API
service = DecentralService(host="0.0.0.0", port=6379, max_workers=4)
service.start()

# Service is now running and accessible via HTTP API
# Keep running until interrupted
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    service.stop()
```

## API Reference

### Storage Operations

```python
# Set key-value pair
service.set("key", "value", ttl=300)  # Optional TTL in seconds

# Get value
value = service.get("key")

# Check if key exists
exists = service.exists("key")

# Delete key
deleted = service.delete("key")

# List keys with pattern
keys = service.keys("user:*")  # Wildcard support

# Get TTL
ttl = service.ttl("key")  # -1 = no TTL, -2 = key doesn't exist

# Clear all data
service.flush()
```

### Task Processing

```python
# Submit task
task_id = service.submit_task(
    task_type="math",
    data={"operation": "add", "values": [1, 2, 3]},
    priority=1  # Higher number = higher priority
)

# Check task status
status = service.get_task_status(task_id)

# Queue operations
queue_size = service.queue_size()
is_empty = service.queue_empty()
```

### Custom Task Handlers

```python
def my_handler(data, task):
    # Process the data
    result = {"processed": data, "task_id": task.id}
    return result

# Add custom handler
service.worker_pool.add_task_handler("my_task_type", my_handler)

# Submit task with custom handler
task_id = service.submit_task("my_task_type", {"input": "data"})
```

## Built-in Task Handlers

- **default**: Returns data as-is
- **echo**: Returns data with metadata
- **delay**: Sleeps for specified duration
- **math**: Performs arithmetic operations (add, multiply, subtract, divide)

## HTTP API

When API is enabled, the service exposes REST endpoints:

### Storage Endpoints

- `GET /storage/<key>` - Get value
- `PUT /storage/<key>` - Set value (JSON body: `{"value": "...", "ttl": 300}`)
- `DELETE /storage/<key>` - Delete key
- `GET /storage/<key>/exists` - Check if key exists
- `GET /storage/<key>/ttl` - Get TTL
- `GET /storage/keys?pattern=*` - List keys
- `POST /storage/flush` - Clear all data

### Task Endpoints

- `POST /tasks` - Submit task (JSON body: `{"type": "...", "data": {...}, "priority": 0}`)
- `GET /queue/tasks/<task_id>` - Get task status
- `GET /queue/size` - Get queue size
- `GET /queue/stats` - Get queue statistics

### System Endpoints

- `GET /health` - Health check
- `GET /stats` - Service statistics
- `GET /workers/stats` - Worker pool statistics

## Configuration

```python
service = DecentralService(
    host="localhost",        # API server host
    port=6379,              # API server port
    max_workers=4,          # Number of worker threads
    enable_api=True,        # Enable HTTP API
    log_level="INFO",       # Logging level
    debug=False             # Enable debug mode
)
```

## Examples

### Basic Usage

```python
from decentral_service import DecentralService

with DecentralService() as service:
    # Set data
    service.set("config", {"debug": True, "version": "1.0"})
    
    # Get data
    config = service.get("config")
    print(f"Config: {config}")
    
    # Process task
    task_id = service.submit_task("echo", {"message": "Hello"})
    
    # Wait and check result
    time.sleep(1)
    status = service.get_task_status(task_id)
    print(f"Task result: {status['result']}")
```

### Advanced Task Processing

```python
def process_user_data(data, task):
    # Simulate processing
    time.sleep(1)
    return {
        "user_id": data["id"],
        "processed_at": time.time(),
        "status": "completed"
    }

service = DecentralService(max_workers=2)
service.worker_pool.add_task_handler("user_processing", process_user_data)

with service:
    # Submit multiple tasks
    for i in range(10):
        service.submit_task(
            "user_processing",
            {"id": i, "name": f"User {i}"},
            priority=i % 3
        )
    
    # Monitor progress
    while service.queue_size() > 0:
        print(f"Queue size: {service.queue_size()}")
        time.sleep(0.5)
```

## Performance

- **Storage**: O(1) average case for get/set/delete operations
- **Queue**: O(log n) for enqueue/dequeue with priority support
- **Threading**: Configurable worker pool for concurrent processing
- **Memory**: In-memory storage with automatic cleanup of expired keys

## Thread Safety

All operations are thread-safe and can be used concurrently from multiple threads without additional synchronization.

## Error Handling

The service includes comprehensive error handling:
- Task failures are retried up to a configurable limit
- Expired keys are automatically cleaned up
- Worker threads handle exceptions gracefully
- API endpoints return appropriate HTTP status codes

## Monitoring

Access detailed statistics via:
- `service.get_stats()` - Overall service statistics
- `service.task_queue.get_stats()` - Queue statistics
- `service.worker_pool.get_stats()` - Worker pool statistics
- HTTP API endpoints for external monitoring

## Changelog

### [1.1.0] - 2025-01-11 (see detail in CHANGELOG.md)

## License

This project is licensed under the MIT License.

## Contributing

Contributions are welcome! Please feel free to submit issues and pull requests.

## Support

For questions and support, please open an issue in the repository.
