Metadata-Version: 2.1
Name: pyqanat
Version: 0.0.0
Summary: Create simple event-driven, distributed data pipelines in Python
Home-page: https://github.com/jarombouts/qanat
Author: Your Name
Author-email: 
License: MIT
Keywords: data pipelines event-driven architecture microservices mqtt
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENCE

# Qanat

Qanat: Create simple event-driven, distributed data pipelines in Python.

Qanat is a lightweight tool for setting up event-driven, distributed data pipelines with
ease. It lets you link together components into a type-checked, decoupled pipeline, with
inter-process communication handled automatically through MQTT (in the future RabbitMQ?)
and serialization handled through JSON.

## Usage Example

```python
from qanat import QanatPipeline
from dataclasses import dataclass
from typing import List

@dataclass
class Frame:
    image: bytes
    timestamp: str

@dataclass
class Detection:
    type: str
    confidence: float

@dataclass
class DetectionResults:
    objects: List[Detection]
    frame_timestamp: str

# Initialize the pipeline with the MQTT broker connection
pipeline = QanatPipeline(broker="mqtt://broker.hivemq.com:1883")

@pipeline.component(output="qanat-demo/frames/raw")
def frame_producer() -> Frame:
    """
    Simulates producing a frame and its metadata.
    """
    image_data = b'some_image_data'  # Replace with actual image data
    return Frame(image=image_data, timestamp="2021-07-01T00:00:00Z")

@pipeline.component(
    input="qanat-demo/frames/raw", 
    output="qanat-demo/detections/results"
)
def detector(frame: Frame) -> DetectionResults:
    """
    Processes a frame and detects objects, outputting detection results.
    """
    detected_objects = [Detection(type="smoke", confidence=0.98)]
    return DetectionResults(objects=detected_objects, frame_timestamp=frame.timestamp)

@pipeline.component(input="qanat-demo/detections/results")
def result_publisher(detection_results: DetectionResults):
    """
    Publishes the detection results.
    """
    print(f"Publishing results: {detection_results}")

# Start the event loop for testing and demonstration
if __name__ == "__main__":
    # For demo or testing purposes, this runs all components in the pipeline
    pipeline.start_event_loop() # Blocks until the event loop terminates

    # In real-world distributed usage, you would start three separate processes, 
    # one for each component, e.g.:
    # frame_producer.start() # Blocks until this specific component terminates
```

