Metadata-Version: 2.1
Name: kafka_messaging
Version: 1.0
Summary: Simple interface for sending messages through Kafka
Home-page: https://gitlab.com/zaitt_computer_vision/kafka_api
Author: Andre Ferrari Moukarzel
Author-email: andremoukarzel@gmail.com
License: MIT
Description: # Kafka Messaging
        
        Kafka Messaging is a minimalist interface for using the confluent kafka package
        in Python3.
        
        It can be installed with the following command:
        
        ``` bash
        pip3 install kafka_messaging
        ```
        
        ## Usage
        
        Kafka Messaging uses Senders to send messages and Listeners to receive them.
        Optionally, you may also use ListenerPool, an interface to synchronize and act
        on multiple Listeners.
        
        
        ### Sender
        
        To use the Sender, simply instance one in the script. Then, the send() method
        can be used to send a message to the topic specified in the first argument.
        
        The sent message is a collection of data that may be compartimentalized in
        different arguments. The argument names can then be used to retrieve such
        data with a Listener.
        
        The sending of data can be done like so:
        
        ```python
        from kafka_messaging import Sender
        
        sender = Sender()
        while True:
            sender.send("my_topic", info1="Hello", info_two=["World"])
        ```
        
        As shown, data can be in different formats (string, list, dictionary, or
        anything else that can be serialized with Pickle).
        
        A Sender instance can also be customized with different Kafka Producer
        configurations that can be set in it's instantiation like so:
        
        ```python
        sender = Sender(ip="127.0.0.1", port=9092, acks=0)
        ```
        
        ### Listener
        
        To receive messages sent by a Sender, a Listener can be used. A Listener can be
        configured in initialization just like a Sender, with the addition that a topic
        to listen to must be specified:
        
        ```python
        listener = Listener(topic="my_topic", ip="127.0.0.1", port=9092)
        ```
        > Keep in mind that the Senders are an interface for Kafka Producers, and
        > Listeners for Kafka Consumers, so they don't have the same configuration
        > attributes.
        
        The listen() method of a Listener may be called to activate it, but be mindful
        to not be stuck, since it loops until stop() is called.
        
        A minimalist usage of a Listener is as such:
        
        ```python
        from threading import Thread
        
        from kafka_messaging import Listener
        
        
        def listener_thread(listener: Listener):
            """Calls the listening loop of the received Listener. """
            listener.listen()
        
        
        my_listener = Listener("my_topic")
        thread = Thread(target=listener_thread, args[my_listener])
        
        # Wait until information is received
        while not my_listener.has_info("info1"):
            pass
        print(my_listener.get_info("info1"))
        print(my_listener.get_info("info_two"))
        my_listener.stop()
        thread.join()
        ```
        
        ### ListenerPool
        
        The ListenerPool can be used to synchronize multiple Listeners and execute
        methods on the synchronized Listeners.
        
        The method that defines when Listeners are synchronized is completly
        customizable, and so can be customized the methods that are executed after the
        syncronization, as exemplified bellow:
        
        ```python
        from threading import Thread
        
        from kafka_messaging import ListenerPool
        
        
        def synchronization(listeners: dict):
            """Considers the listeners synchronized when all of them have received info1
            """
            synced = True
            for key in listeners.keys():
                if not listeners[key].has_info("info1"):
                    synced = False
            return synced
        
        
        def execute(listeners: dict, fps):
            """Prints all listeners' info1 after they are synchronized and then disposes
            of itself. """
            for key in listeners.keys():
                print(listeners[key].get_info("info1"))
                print(listeners[key].get_info("info_two"))
            return True # Returning True leads a method to be unbound from ListenerPool
        
        
        pool = ListenerPool("*")
        pool.set_sync_condition(synchronization)
        pool.bind(execute)
        pool.start()
        pool.get_syncer_thread().join()
        ```
        
Platform: UNKNOWN
Requires-Python: >=3.6
Description-Content-Type: text/markdown
