Metadata-Version: 2.1
Name: resistant_kafka
Version: 0.7
Description-Content-Type: text/markdown
Requires-Dist: confluent-kafka==2.8.2
Requires-Dist: pydantic==1.10.21

# Resistant Kafka

**Resistant Kafka** is a Python library designed to simplify and stabilize interactions with Apache Kafka, both as a
producer and a consumer.

## Features



### 🔌 Easy integration into any Python service

To connect a consumer or producer, you just need to create _**one instance of the corresponding class**_: ConsumerInitializer
or ProducerInitializer.
###

### 🧾 Built-in logging of errors and events

Output of basic logs of connection of topic handlers in one consumer, as well as output of messages about successful
sending of a message from the producer to certain topics.

###

### 🛡️ Resilience against consumer-side crashes

If an exception raises in the processor when reading a specific topic, by default, a detailed log about the dropped message
will be issued and the consumer will continue its work.

In case you need to stop reading topic and raise exception - this option has also been added.

###

### 🧩 Handler creation for each topic in your service (Asynchronous)

One of the problems of working in the consumer _**is the case where the service reads several topics at the same time**_ and
this happens synchronously and in one handler.

**We solved this problem!** 

By adding asynchronous reading of topics and adding the ability to read topics independently of
each other. Even if one of them crashes _(a crash will occur if you set the raise_error=True attribute in the
kafka_processor)_ - the other handler will continue its work.

Also in this case it is very easy to separate the logic of processing messages of different topics if their keys,
message type differ from each other.

###

# Consumer Initializer

## First Step. Add enviroments

Using the **_ConsumerConfig_** scheme you can configure the message reading handler in your service.

_If reading of several topics is expected, then a more convenient way is to assemble common settings for connecting to
Kafka and add them to the handler class (for example, to KafkaMessageProcessor) by **kwargs_ .

### EXAMPLE:

```commandline

from resistant_kafka.consumer_schemas import ConsumerConfig

common_consumer_config = dict(
        bootstrap_servers='HOST:PORT',
        group_id='CONSUMER_NAME',
        auto_offset_reset='latest',
        enable_auto_commit=False,
)

process_task_1 = KafkaMessageProcessor(
    config=ConsumerConfig(
        topic_to_subscribe='TOPIC_NAME_1',
        processor_name='KafkaMessageProcessor1',
        **common_consumer_config
    )
)

process_task_2 = KafkaMessageProcessor(
    config=ConsumerConfig(
        topic_to_subscribe='TOPIC_NAME_2',
        processor_name='KafkaMessageProcessor2',
        **common_consumer_config
    )
)

```
##
## Second Step. Add processor

Processor is a class-handler of a specific topic. It allows to perform CRUD operations on received messages from a given
topic **_independently of other processors._**

⚠️ **The name of the main method _"process"_ is reserved and is required for installation**.

⚠️**The decorator "_kafka_processor_" is also required**, which is responsible for the operation of the message stream and the
stable operation of the main method "process". It has the attribute raise_error, which allows to raise an error, while
the work of a specific handler will be stopped.

### EXAMPLE:

```commandline

from resistant_kafka.consumer import ConsumerInitializer, kafka_processor

class KafkaMessageProcessor1(ConsumerInitializer):
    def __init__(
            self, config: ConsumerConfig
    ):
        super().__init__(config=config)
        self._config = config

    # required decorator
    # to raise error, instead logging @kafka_processor(raise_error=True)
    @kafka_processor() 
    async def process(self):  # required service name 
        message = await self.get_message(consumer=self._consumer)
        if self.message_is_empty(message=message, consumer=self._consumer):
            return
        
        message_key = message.key().decode("utf-8")
        message_value = message.value().decode("utf-8")
        
        # here you can process your message
        print('-----------------------------')
        print('KEY', message_key)
        print('VALUE', message_value)
        print('CONSUMER', self._config.topic_to_subscribe)
        print('-----------------------------')

```
##
## Third Step. Initialization

In order to start topic processors, you should use the "**_init_kafka_connection_**" method, to which you need to pass a list of
instances of the processor-classes.

### EXAMPLE:

```commandline
from resistant_kafka.consumer import init_kafka_connection

process_task_1 = KafkaMessageProcessor1(
    config=ConsumerConfig(
        topic_to_subscribe='TOPIC_NAME_1',
        processor_name='KafkaMessageProcessor1',
        **consumer_config
    )
)

process_task_2 = KafkaMessageProcessor2(
    config=ConsumerConfig(
        topic_to_subscribe='TOPIC_NAME_2',
        processor_name='KafkaMessageProcessor2',
        **consumer_config
    )
)

init_kafka_connection(
    tasks=[process_task_1, process_task_2]
)
```
###
️⚠️In the way, where you have already created loop - use method **_"process_kafka_connection"_**
```commandline
import asyncio
from resistant_kafka.consumer import process_kafka_connection

asyncio.create_task(process_kafka_connection([inventory_changes_processor]))
```

###
## Additional Step. Add security
To add security, you should set the **_secure=True_** attribute, and the required fields will be: 
* **_oauth_cb_**, 
* **_security_protocol_**, 
* **_sasl_mechanisms_**.

```commandline
consumer_config = ConsumerConfig(
        bootstrap_servers='HOST:PORT',
        group_id='CONSUMER_NAME',
        auto_offset_reset='latest',
        enable_auto_commit=False,
        
        secured=True,
        oauth_cb=method_to_get_token,
        security_protocol='SECURITY_PROTOCOL', # (e.g., 'SASL_PLAINTEXT', 'SASL_SSL').
        sasl_mechanisms='SASL_MECHANISMS'  # (e.g., 'PLAIN', 'SCRAM-SHA-256', 'OAUTHBEARER').
)
```


##
##
# Producer Initializer
## First Step. Add enviroment variables
To configure a producer you will need only 2 fields: URL for connecting Kafka and the producer name.
```commandline
producer_config = ProducerConfig(
        producer_name='KafkaTesterProducer1',
        bootstrap_servers='HOST:PORT',
)
```
##
## Second Step. Add processor

The  **_send_message_** method of **_ProducerInitializer_** class allows to send a message to a topic

Also an optional parameter of **_DataSend_** scheme is **_"headers"_** which allows to send additional information in a message without changing the structure of this message.

### EXAMPLE:

```commandline
task = ProducerInitializer(
    config=producer_config
)

task.send_message(
    data_to_send=DataSend(
        key='KEY1',
        value='VALUE1',
    )
)

# with headers
task.send_message(
    data_to_send=DataSend(
        key='KEY2',
        value='VALUE12,
        headers=[('additinal_key', 'additinal_value')]
    )
)
```


##
## Additional Step. Add security
To add security, you should set the **_secure=True_** attribute, and the required fields will be: 
* **_oauth_cb_**, 
* **_security_protocol_**, 
* **_sasl_mechanisms_**.


```commandline
producer_config = ProducerConfig(
        producer_name='KafkaTesterProducer1',
        bootstrap_servers='HOST:PORT',
        
        secured=True,
        oauth_cb=method_to_get_token,
        security_protocol='SECURITY_PROTOCOL', # (e.g., 'SASL_PLAINTEXT', 'SASL_SSL').
        sasl_mechanisms='SASL_MECHANISMS'  # (e.g., 'PLAIN', 'SCRAM-SHA-256', 'OAUTHBEARER').
)
```

#
## Installation

```bash
pip install resistant-kafka
```

# CONSUMER CODE EXAMPLE

```commandline

from kafka_connection_utils import custom_method_to_get_token
from resistant_kafka.consumer_schemas import ConsumerConfig
from resistant_kafka.consumer import ConsumerInitializer, kafka_processor, init_kafka_connection

consumer_config = dict(
    bootstrap_servers='HOST:PORT',
    group_id='CONSUMER_NAME',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    secured=True,
    oauth_cb=custom_method_to_get_token,
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanisms='OAUTHBEARER'
)


class KafkaMessage1Processor(ConsumerInitializer):
    def __init__(
            self, config: ConsumerConfig
    ):
        super().__init__(config=config)
        self._config = config

    @kafka_processor()
    async def process(self):
        message = await self.get_message(consumer=self._consumer)
        if self.message_is_empty(message=message, consumer=self._consumer):
            return

        message_key = message.key().decode("utf-8")
        message_value = message.value().decode("utf-8")

        if message_value in ['WRONG_VALUE']:
            raise ValueError('You catch wrong value')

        print('-----------------------------')
        print('KEY', message_key)
        print('VALUE', message_value)
        print('CONSUMER', self._config.topic_to_subscribe)
        print('-----------------------------')


class KafkaMessage2Processor(ConsumerInitializer):
    def __init__(
            self,
            config: ConsumerConfig
    ):
        super().__init__(config=config)
        self._config = config

    @kafka_processor()
    async def process(self):
        message = await self.get_message(consumer=self._consumer)
        if self.message_is_empty(message=message, consumer=self._consumer):
            return

        message_key = message.key().decode("utf-8")
        message_value = message.value().decode("utf-8")

        print('-----------------------------')
        print('KEY', message_key)
        print('VALUE', message_value)
        print('PRODUCER', self._config.topic_to_subscribe)
        print('-----------------------------')


process_task_1 = KafkaMessage1Processor(
    config=ConsumerConfig(
        topic_to_subscribe='KafkaTesterProducer1',
        processor_name='KafkaProcessor1',
        **consumer_config
    )
)

process_task_2 = KafkaMessage2Processor(
    config=ConsumerConfig(
        topic_to_subscribe='KafkaTesterProducer2',
        processor_name='KafkaProcessor2',
        **consumer_config
    )
)

init_kafka_connection(
    tasks=[process_task_1, process_task_2]
)
```

#
# PRODUCER CODE EXAMPLE
```commandline
from kafka_connection_utils import custom_method_to_get_token
from resistant_kafka import ProducerInitializer, ProducerConfig, DataSend

producer_config = dict(
    bootstrap_servers='HOST:PORT',
    secured=True,
    oauth_cb=custom_method_to_get_token,
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanisms='OAUTHBEARER'
)

task = ProducerInitializer(
    config=ProducerConfig(
        producer_name='KafkaTesterProducer1',
        **producer_config
    )
)
task.send_message(
    data_to_send=DataSend(
        key='KEY1',
        value='VALUE1',
    )
)
task.send_message(
    data_to_send=DataSend(
        key='KEY1',
        value='WRONG_VALUE'
    )
)

task = ProducerInitializer(
    config=ProducerConfig(
        producer_name='KafkaTesterProducer2',
        **producer_config
    ),

)
task.send_message(
    data_to_send=DataSend(
        key='KEY2',
        value='VALUE2'
    ))

```
