Metadata-Version: 2.1
Name: talus
Version: 1.0.0
Summary: A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.
Author-email: NSO / AURA <dkistdc@nso.edu>
License: BSD 3-Clause
Project-URL: repository, https://bitbucket.org/dkistdc/interservice-bus-adapter
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.11
Description-Content-Type: text/x-rst
License-File: LICENSE
Requires-Dist: pika>1.3
Requires-Dist: tenacity>=8.2
Requires-Dist: pydantic>=2.0
Provides-Extra: test
Requires-Dist: tox>=4; extra == "test"
Requires-Dist: pytest; extra == "test"
Requires-Dist: pytest-cov; extra == "test"
Requires-Dist: Pygments; extra == "test"
Requires-Dist: PyPDF4; extra == "test"

talus
=========

|codecov|

talus (noun) - ta·​lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Features
--------

- Guided separation of connections for producers and consumers

- Re-establish connections to the server when lost

- Constrained interface to support simple produce / consume use cases for direct exchanges

Installation
------------

.. code:: bash

   pip install talus

Examples
--------

**Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing**

.. code:: python

    from talus.consumer DurableConsumer
    from talus.producer DurableProducer
    from talus.models.retryer import ConnectionRetryerFactory
    from talus.models.connection_parameters import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
    from talus.models.processor import MessageProcessorBase
    from talus.models.message import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
    from talus.models.queue import Queue
    from talus.models.exchange import Exchange
    from talus.models.binding import Binding
    from typing import Type

    ############
    # Consumer #
    ############
    # Configure messages that will be consumed
    class ObjectMessageBody(MessageBodyBase):
        objectName: str
        bucket: str

    class ConsumeMessage(ConsumeMessageBase):
        message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody

    # Configure the queue the messages should be consumed from
    inbound_queue = Queue(name="inbound.q")

    # Configure a message processor to handle the consumed messages
    class MessageProcessor(MessageProcessorBase):
        def process_message(self, message: ConsumeMessage):
            print(f"Received message: {message}")
            outbound_message = PublishMessage(
                body=ObjectMessageBody(objectName=message.body.objectName, bucket="newBucket"),
            )  # change the bucket name for some reason
            self.producer.publish(outbound_message)


    ############
    # Producer #
    ############
    # Configure messages that will be produced
    class PublishMessage(PublishMessageBase):
        message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody # using the same schema for simplicity
        default_routing_key: str = "outbound.message.m"

    # Configure the queue the messages should be routed to
    outbound_queue_one = Queue(name="outbound.one.q")
    outbound_queue_two = Queue(name="outbound.two.q")


    # Configure the exchange and queue bindings for publishing
    publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
    bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
                Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.

    # Actually Connect and run the consumer
    def main():
        with DurableProducer(
            queue_bindings=bindings,
            publish_exchange=publish_exchange,
            connection_parameters=ProducerConnectionParameterFactory(),
            connection_retryer=ConnectionRetryerFactory(),
        ) as producer:
            with DurableConsumer(
                consume_queue=inbound_queue,
                connection_parameters=ConsumerConnectionParameterFactory(),
                connection_retryer=ConnectionRetryerFactory(),
            ) as consumer:
                message_processor = MessageProcessor(producer=producer)
                consumer.listen(message_processor)

    if __name__ == "__main__":
        main()

.. |codecov| image:: https://codecov.io/bb/dkistdc/interservice-bus-adapter/branch/master/graph/badge.svg
   :target: https://codecov.io/bb/dkistdc/interservice-bus-adapter
