Metadata-Version: 2.4
Name: django-pgwatch
Version: 1.0.0
Summary: Django app for PostgreSQL LISTEN/NOTIFY with persistence and playback
Author-email: Ed Menendez <ed@edmenendez.com>
License: MIT
Project-URL: Homepage, https://github.com/edmenendez/django-pgwatch
Project-URL: Documentation, https://github.com/edmenendez/django-pgwatch#readme
Project-URL: Repository, https://github.com/edmenendez/django-pgwatch
Project-URL: Bug Tracker, https://github.com/edmenendez/django-pgwatch/issues
Keywords: django,postgresql,notify,listen,database,triggers
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Framework :: Django
Classifier: Framework :: Django :: 4.2
Classifier: Framework :: Django :: 5.0
Classifier: Framework :: Django :: 5.1
Classifier: Topic :: Database
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: Django>=4.2
Requires-Dist: psycopg>=3.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-django>=4.5; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: ruff>=0.1; extra == "dev"
Requires-Dist: mypy>=1.0; extra == "dev"
Requires-Dist: django-stubs>=4.2; extra == "dev"
Dynamic: license-file

# django-pgwatch

A Django app that provides PostgreSQL LISTEN/NOTIFY functionality with persistence and playback capabilities. This solves the problem of missed notifications when consumers are disconnected by storing all notifications in a database table and providing automatic playback functionality.

## Table of Contents

- [Quick Start](#quick-start)
- [Features](#features)
- [Installation](#installation)
- [Basic Usage](#basic-usage)
  - [Sending Notifications](#1-sending-notifications)
  - [Creating a Consumer](#2-creating-a-consumer)
  - [Running Consumers](#3-running-consumers)
- [Database Change Notifications](#database-change-notifications)
- [Management Commands](#management-commands)
- [Common Patterns](#common-patterns)
- [Advanced Usage](#advanced-usage)
- [Admin Interface](#admin-interface)
- [Architecture & Performance](#architecture--performance)
- [Testing](#testing)

## Quick Start

**Get up and running in 3 steps:**

1. **Install and migrate:**
   ```bash
   # Add 'django_pgwatch' to INSTALLED_APPS
   python manage.py migrate django_pgwatch
   ```

2. **Send a notification:**
   ```python
   from django_pgwatch.utils import smart_notify
   smart_notify('my_channel', {'event': 'test', 'user_id': 123})
   ```

3. **Create and run a consumer:**
   ```python
   # myapp/consumers.py
   from django_pgwatch.consumer import BaseConsumer, NotificationHandler
   
   class MyConsumer(BaseConsumer):
       consumer_id = 'my_consumer'
       channels = ['my_channel']
       
       def handle_notification(self, handler: NotificationHandler):
           print(f"Received: {handler.data}")
   ```
   
   ```bash
   python manage.py pgwatch_listen
   ```

**That's it!** Your consumer will process all notifications and stay running for new ones.

## Features

- **Guaranteed Delivery**: All notifications are persisted to a database table
- **Playback Capability**: Consumers can catch up on missed notifications after reconnecting
- **Multiple Consumers**: Track which consumers have processed each notification
- **Large Payload Support**: Automatically handles payloads larger than PostgreSQL's 8KB limit
- **Gap Detection**: Automatically detects and fills missed notifications during operation
- **Consumer Management**: Track consumer progress and handle consumer-specific processing
- **Django Integration**: Native Django models, admin interface, and management commands

## Installation

1. Add `django_pgwatch` to your `INSTALLED_APPS`:
   ```python
   INSTALLED_APPS = [
       # ... other apps
       'django_pgwatch',
   ]
   ```

2. Run migrations (automatically installs PostgreSQL functions):
   ```bash
   python manage.py migrate django_pgwatch
   ```

## Basic Usage

### 1. Sending Notifications

```python
from django_pgwatch.utils import smart_notify

# Send a simple notification
notification_log_id = smart_notify('my_channel', {
    'event_type': 'user_login',
    'user_id': 123,
    'timestamp': '2024-01-01T10:00:00Z'
})

# The notification is automatically persisted and sent via PostgreSQL NOTIFY
```

### 2. Creating a Consumer

Create a `consumers.py` file in your Django app:

```python
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler

class MyConsumer(BaseConsumer):
    # Class attributes for auto-discovery
    consumer_id = 'my_consumer'
    channels = ['my_channel', 'data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        print(f"Received: {handler.data}")
        
        # Access notification details
        print(f"Notification Log ID: {handler.notification_log_id}")
        print(f"Channel: {handler.channel}")
        print(f"Is replay: {handler.is_replay}")
        
        # For database change notifications
        if handler.get_table() == 'users':
            if handler.is_insert():
                print(f"New user: {handler.get_new_data()}")
            elif handler.is_update():
                print(f"User updated: {handler.get_old_data()} -> {handler.get_new_data()}")
            elif handler.is_delete():
                print(f"User deleted: {handler.get_old_data()}")
```

### 3. Running Consumers

The management command automatically discovers all consumers from your `INSTALLED_APPS`:

```bash
# Run all discovered consumers
python manage.py pgwatch_listen

# List all discoverable consumers
python manage.py pgwatch_listen --list-consumers

# Run consumers from specific apps only
python manage.py pgwatch_listen --apps myapp otherapp

# Run specific consumers by ID
python manage.py pgwatch_listen --consumers my_consumer webhook_sender

# Exclude specific consumers
python manage.py pgwatch_listen --exclude-consumers heavy_processor
```

## Database Change Notifications

### Setting up Triggers

The app provides a trigger function that automatically sends notifications for database changes.

#### Recommended Approach: Django Migrations (Preferred)

The best way to create database triggers is using Django migrations with `RunSQL`:

```python
# In your app's migration file (e.g., migrations/0002_create_triggers.py)
from django.db import migrations

class Migration(migrations.Migration):
    dependencies = [
        ('your_app', '0001_initial'),
        # Ensure notify_data_change() function exists
        ('django_pgwatch', '0002_install_pg_functions'),
    ]

    operations = [
        migrations.RunSQL(
            # Forward migration - create trigger
            sql="""
            CREATE TRIGGER notify_users_changes
                AFTER INSERT OR UPDATE OR DELETE ON users
                FOR EACH ROW 
                EXECUTE FUNCTION notify_data_change();
            """,
            # Reverse migration - drop trigger
            reverse_sql="""
            DROP TRIGGER IF EXISTS notify_users_changes ON users;
            """
        ),
    ]
```

**Benefits of using migrations:**
- ✅ Automatic deployment with `python manage.py migrate`
- ✅ Version controlled and reversible
- ✅ Consistent across environments
- ✅ No manual intervention required

#### Alternative Approaches

**Direct SQL:**
```sql
-- Create a trigger on any table
CREATE TRIGGER notify_users_changes
    AFTER INSERT OR UPDATE OR DELETE ON users
    FOR EACH ROW EXECUTE FUNCTION notify_data_change();
```

**Python helper function:**
```python
from django_pgwatch.examples import create_trigger_for_table

# Create trigger for the users table
create_trigger_for_table('users')
create_trigger_for_table('orders', 'order_events')  # Custom channel
```

**Note:** The Python helper and direct SQL approaches require manual execution and are not version controlled. Use migrations for production applications.

### Consuming Database Changes

```python
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler

class DatabaseChangeConsumer(BaseConsumer):
    consumer_id = 'database_changes'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        table = handler.get_table()
        action = handler.get_action()  # INSERT, UPDATE, DELETE
        
        if table == 'users' and action == 'INSERT':
            user_data = handler.get_new_data()
            self.send_welcome_email(user_data)
        elif table == 'orders' and action == 'UPDATE':
            old_data = handler.get_old_data()
            new_data = handler.get_new_data()
            
            if old_data['status'] != new_data['status']:
                self.send_status_update(new_data)
```

## Common Patterns

### Cache Invalidation

```python
class CacheInvalidationConsumer(BaseConsumer):
    consumer_id = 'cache_invalidator'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        if handler.is_database_change():
            cache_key = f"{handler.get_table()}:{handler.get_record_id()}"
            cache.delete(cache_key)
```

### Webhook Integration

```python
class WebhookConsumer(BaseConsumer):
    consumer_id = 'webhook_sender'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        requests.post(settings.WEBHOOK_URL, json=handler.data)
```

### Analytics Tracking

```python
class AnalyticsConsumer(BaseConsumer):
    consumer_id = 'analytics_processor'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        analytics.track(handler.get_record_id(), handler.get_action(), handler.data)
```

## Management Commands

### pgwatch_listen

**Basic usage:**
```bash
# Run all discovered consumers
python manage.py pgwatch_listen

# List available consumers
python manage.py pgwatch_listen --list-consumers
```

**Common filtering options:**
```bash
# Run specific consumers
python manage.py pgwatch_listen --consumers webhook_sender cache_invalidator

# Run consumers from specific apps
python manage.py pgwatch_listen --apps myapp otherapp

# Exclude heavy processors
python manage.py pgwatch_listen --exclude-consumers analytics_processor
```

**Advanced options:**
- `--timeout=30`: Listening timeout in seconds
- `--max-batch-size=100`: Playback batch size
- `--skip-playback`: Skip missed notifications, only process new ones
- `--reconnect-delay=5`: Delay before reconnecting after error

### pgwatch_cleanup

```bash
# Clean up old notifications (keep 7 days)
python manage.py pgwatch_cleanup --days=7

# Preview what will be deleted
python manage.py pgwatch_cleanup --days=7 --dry-run
```

### Deployment Patterns

**Single process (default):**
```bash
python manage.py pgwatch_listen  # All consumers in one process
```

**Parallel processes:**
```bash
# Split heavy consumers into separate processes
python manage.py pgwatch_listen --consumers cache_invalidator &
python manage.py pgwatch_listen --consumers webhook_sender &
```

**Load balancing:**
```python
# Create multiple worker consumers
class Worker1Consumer(BaseConsumer):
    consumer_id = 'worker_1'
    channels = ['work_queue']
    
class Worker2Consumer(BaseConsumer):
    consumer_id = 'worker_2'
    channels = ['work_queue']
```

## Advanced Usage

### Error Handling

```python
class RobustConsumer(BaseConsumer):
    consumer_id = 'robust_consumer'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        try:
            self.process_notification(handler)
        except RetryableError as e:
            logger.error(f"Retryable error: {e}")
            raise  # Will retry
        except PermanentError as e:
            logger.error(f"Permanent error: {e}")
            # Don't re-raise - marks as processed
```

### Filtering Notifications

```python
class FilteredConsumer(BaseConsumer):
    consumer_id = 'filtered_consumer'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        # Filter by table
        if handler.get_table() not in ['users', 'orders']:
            return
        
        # Skip replayed notifications
        if handler.is_replay:
            return
            
        self.process_notification(handler)
```

### Custom Notifications

```python
from django_pgwatch.examples import send_custom_notification

send_custom_notification('user_events', 'password_reset', {
    'user_id': 123,
    'ip_address': '192.168.1.1',
    'requested_at': '2024-01-01T10:00:00Z'
})
```

## Admin Interface

The app provides a Django admin interface for monitoring notifications:

- View all notification logs
- See which consumers have processed each notification
- Clean up old notifications
- Reprocess notifications (clear consumer tracking)
- View summary statistics

Access at `/admin/django_pgwatch/notificationlog/`

### Database Functions

**smart_notify(channel_name, payload_data)** - Persists and sends notifications:
```sql
SELECT smart_notify('my_channel', '{"event": "test"}'::jsonb);
```

**notify_data_change()** - Trigger function for database changes:
```sql
CREATE TRIGGER my_table_changes
    AFTER INSERT OR UPDATE OR DELETE ON my_table
    FOR EACH ROW EXECUTE FUNCTION notify_data_change();
```

## Architecture & Performance

### How It Works

1. **Notification Storage**: All notifications stored in `notification_log` table
2. **Consumer Tracking**: Each consumer tracks processed notifications
3. **Playback**: On startup, process missed notifications
4. **Real-time**: Listen for new notifications via PostgreSQL LISTEN/NOTIFY
5. **Gap Detection**: Automatic detection of missed notifications

### Performance Features

- **Batch Processing**: Configurable batch sizes for playback
- **Parallel Processing**: Multiple consumers process simultaneously
- **Database Optimization**: Indexes on channel and timestamps
- **Large Payloads**: Automatic handling of payloads >8KB
- **Cleanup**: Configurable retention periods

### Monitoring

- Django admin interface for notification tracking
- Consumer progress and error monitoring
- Database table size monitoring
- Alerting for unprocessed notifications

## Testing

### TODO: Test Coverage Needed

The following areas need comprehensive test coverage:

**Auto-Discovery Tests:**
- [ ] Test consumer discovery from multiple apps
- [ ] Test filtering by app names (`--apps`)
- [ ] Test filtering by consumer IDs (`--consumers`) 
- [ ] Test excluding consumers (`--exclude-consumers`)
- [ ] Test error handling for non-existent apps/consumers
- [ ] Test `--list-consumers` output formatting

**Consumer Base Class Tests:**
- [ ] Test BaseConsumer class attribute inheritance
- [ ] Test constructor parameter override of class attributes
- [ ] Test validation error when no consumer_id provided
- [ ] Test channel configuration precedence

**Database Integration Tests:**
- [ ] Test trigger creation and notification sending
- [ ] Test consumer playback of missed notifications
- [ ] Test real-time notification processing
- [ ] Test consumer restart and gap detection
- [ ] Test large payload handling

**Error Handling Tests:**
- [ ] Test consumer exception handling (retryable vs permanent)
- [ ] Test database connection failures and reconnection
- [ ] Test malformed notification payloads
- [ ] Test consumer shutdown and cleanup

**Management Command Tests:**
- [ ] Test command argument parsing and validation
- [ ] Test signal handling for graceful shutdown
- [ ] Test consumer process lifecycle management
- [ ] Test output formatting and logging

**Multi-Consumer Tests:**
- [ ] Test multiple consumers processing same notifications
- [ ] Test consumer isolation (one failure doesn't affect others)
- [ ] Test consumer-specific progress tracking
- [ ] Test parallel vs sequential execution modes

## Contributing

This is an internal Avela Education tool. For issues or feature requests, contact the development team.

## License

Internal use only - Avela Education
