Backends

Django Dispatch supports multiple backend implementations for publishing events to different systems. This page documents the available backends and how to configure them.

Redis Stream Backend

The Redis Stream backend publishes events to a Redis Stream, which is a powerful append-only data structure in Redis that allows for efficient message queuing and processing.

Configuration

OUTBOX_PUBLISHERS = {
    'default': {
        'BACKEND': 'django_broadcaster.backends.RedisStreamBackend',
        'OPTIONS': {
            'host': 'localhost',
            'port': 6379,
            'db': 0,
            'password': None,
            'stream_name': 'events',
            'max_len': 10000,
            'connect_timeout': 5,
            'socket_timeout': 5,
        }
    }
}

Options

  • host: Redis server hostname (default: ‘localhost’)

  • port: Redis server port (default: 6379)

  • db: Redis database number (default: 0)

  • password: Redis password (optional)

  • stream_name: Name of the Redis stream (default: ‘events’)

  • max_len: Maximum length of the stream (default: 10000)

  • connect_timeout: Connection timeout in seconds (default: 5)

  • socket_timeout: Socket timeout in seconds (default: 5)

Consuming Events

To consume events from a Redis Stream, you can use the Redis client directly:

import redis
import json

# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# Read from the stream
messages = r.xread({'events': '0'}, count=10, block=1000)

for stream_name, stream_messages in messages:
    for message_id, message_data in stream_messages:
        # Parse the CloudEvent
        cloud_event = json.loads(message_data[b'cloud_event'].decode('utf-8'))

        # Process the event
        print(f"Received event: {cloud_event['type']} - {cloud_event['id']}")

        # Acknowledge the message
        r.xack(stream_name, 'consumer_group', message_id)

Creating a Custom Backend

You can create custom backends by implementing the PublisherBackend abstract base class:

from django_broadcaster.backends import PublisherBackend
from django_broadcaster.models import OutboxEvent

class MyCustomBackend(PublisherBackend):
    """
    Custom publisher backend implementation
    """

    def __init__(self, config):
        self.config = config
        # Initialize your backend with the provided configuration

    def publish(self, event: OutboxEvent) -> bool:
        """
        Publish an event to your custom backend

        Args:
            event: The OutboxEvent to publish

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Convert to CloudEvent format
            cloud_event = event.to_cloud_event()

            # Implement your publishing logic here
            # ...

            return True
        except Exception as e:
            # Log the error and re-raise
            raise

    def health_check(self) -> bool:
        """
        Check if the backend is healthy and available

        Returns:
            bool: True if healthy, False otherwise
        """
        try:
            # Implement your health check logic here
            # ...
            return True
        except Exception:
            return False

Registering a Custom Backend

To use your custom backend, you need to register it in the publisher:

# In your app's apps.py or a module that runs at startup

from django_broadcaster.publishers import publisher
from myapp.backends import MyCustomBackend

def register_custom_backend():
    publisher._backends['my_custom'] = MyCustomBackend({
        'option1': 'value1',
        'option2': 'value2',
    })

# Call this function during app initialization