Metadata-Version: 2.4
Name: rabbitmq-utils
Version: 1.5.0
Summary: Provide easy connection to RabbitMQ server.
Author-email: Tahir Rafique <tahirrafiqueasad@gmail.com>
License-Expression: MIT
Keywords: rabbitmq,consumer,producer,publisher,subscriber
Classifier: Development Status :: 5 - Production/Stable
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Operating System :: OS Independent
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Science/Research
Classifier: Intended Audience :: Information Technology
Classifier: Natural Language :: English
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pika>=1.3.0
Dynamic: license-file

# rabbitmq-utils

A lightweight Python utility package to simplify working with RabbitMQ for producers, consumers, and RPC communication.

## TLS Support

The following classes now support optional **TLS (SSL)** parameters for secure communication over port `5671`:

- `RabbitMQProducer`
- `RabbitMQConsumer`
- `RPCClient`
- `RPCServer`

You can enable TLS by passing the following optional parameters:

```python
cafile = "path/to/ca.crt"
check_hostname = True
```

These should be provided **only when the RabbitMQ server is configured with TLS** (typically running on port `5671`). If not provided, connections default to non-TLS on port `5672`.

## Producer

The `RabbitMQProducer` class allows you to send messages reliably to any RabbitMQ queue. It includes **Publisher Confirms** to ensure delivery success.

> **Note:** To use the default exchange, pass an empty string (`""`) as the exchange name. When using the default exchange, the `routing_key` should be set to the **queue name**.

You can make the message **persistent** by setting `persistent_message=True`. This ensures that the message survives broker restarts at the cost of some performance.

For priority queues, use the `priority=(some int)` argument in `send_message`.

### Example

```python
from rabbitmq_utils import RabbitMQProducer
import json

message = json.dumps({'hello': 'world'})

rmqp = RabbitMQProducer(
    host='localhost', port=5672, virtual_host='/', 
    username='guest', password='guest', 
    exchange='test_exc', exchange_type='topic',
    persistent_message=False
)
is_sent = rmqp.send_message(
    message,
    routing_key='test_key'
)

if is_sent:
    print('INFO: Message sent.')
else:
    print('ERROR: Unable to send on desired routing key.')
```

## Consumer

The `RabbitMQConsumer` class sets up a durable queue and exchange, binds them, and starts consuming messages using a user-defined callback function.

> If you receive durability-related errors, try deleting the existing queue/exchange before re-running the code.

### Sample Callback Function

```python
def my_callback_function(ch, method, properties, body):
    message = body.decode()
    myLogic()  # your logic here
    ch.basic_ack(delivery_tag=method.delivery_tag)
```

### Example

```python
from rabbitmq_utils import RabbitMQConsumer

rmqc = RabbitMQConsumer(
    host='localhost', port=5672, virtual_host='/', 
    username='guest', password='guest', 
    queue_name='test_que', routing_key='test_key',
    exchange='test_exc', exchange_type='topic',
    callback_fun=my_callback_function,
    max_priority=2  # optional
)
rmqc.receive_message()
```

## Remote Procedure Call (RPC)

RPC allow to run a function on a remote computer and wait for the result. It is a synchronous call. This package provide simplest implementation of RPC.

RPC consist of two parts. One is the **server** that will process the request and other is **client** that will generate the request to server. Following are example of RPC implementation.

### Server

```python
from rabbitmq_utils.rpc import RPCServer

# STARTING RPC SERVER
server = RPCServer(
        host='localhost', port=5672, virtual_host='/', 
        username='guest', password='guest', 
        queue_name='test_que', routing_key='test_key',
    	exchange='test_exc', exchange_type='topic',
    	callback_fun=rpc_callback_function
)
server.receive_message()
```

Callback function of RPC is different from consumer callback function. In this callback we will return the result back to client.

Note: `result` must be string. If it is not then use `json.dumps(result)` to convert it to string.

```python
def rpc_callback_function(ch, method, properties, body):
    # GETTING MESSAGE
    message = body.decode()
    
    # PERFORM YOUR LOGIC HERE
    result = myLogic()
    
    # RETURING RESPONSE
    ch.basic_publish(
        exchange='', routing_key=properties.reply_to,
        properties=pika.BasicProperties(
            correlation_id = properties.correlation_id
        ),
        body=result
    )
    
    # ACKNOWLEDGE WORK IS DONE
    ch.basic_ack(delivery_tag=method.delivery_tag)
    return None
```

### Client

```python
from rabbitmq_utils.rpc import RPCClient
import json

# DEFINING MESSAGE
message = json.dumps({'hello': 'world'})

# SENDING
client = RPCClient(
    host='localhost', port=5672, virtual_host='/', 
    username='guest', password='guest', 
    exchange='test_exc', exchange_type='topic',
    timeout=3, # wait 3 seconds for response. default is None (infinite wait).
    persistent_message=False
)
is_sent, response = client.send_message(
    message,
    routing_key='test_key',
    return_response=True
)

# OUTPUT
print(f'is_sent: {is_sent} \t code: {client.getCode()} \t response: {response}')
```

Client **send_message** receive **return_response** argument (default=False). If this is **True** then client will wait for response for desire **timeout** period.  You can receive response later if you want by using follow sample code:

```python
# SEND REQUEST
is_sent = client.send_message(
    message,
    routing_key
)

# PERFORM YOUR LOGIC

# RECEIVE RESPONSE
response = client.receive_response()
```

Always check the validity of response using status code. Following code will help you check it:

```python
status_code = client.getCode()
print(status_code)
```

Code is **integer**. Following table shows the meanings:

| Code | Meaning                            |
| ---- | ---------------------------------- |
| 200  | Response is successfully obtained. |
| 408  | Timeout occur.                     |

## Author

**Tahir Rafique**

## Releases

| Date      | Version | Summary                                                      |
| --------- | ------- | ------------------------------------------------------------ |
| 29-Jun-25 | 1.5.0  | Adding TLS support for rabbitmq running on 5671 port.         |
| 17-Jan-24 | 1.4.1  | Adding exception handling in default callback function of consumer. |
| 13-Dec-23 | 1.4.0  | Adding queue priority in producer and consumer.              |
| 14-Jul-23 | 1.3.0  | Adding persistent message option.                            |
| 14-Jul-23 | 1.2.1  | Correcting documentation.                                    |
| 21-Jun-23 | 1.2.0  | Adding RPC to module.                                        |
| 27-Apr-23 | 1.0.1  | Improving default callback function.                         |
| 27-Apr-23 | 1.0.0  | Initial build                                                |

