Source code for django_broadcaster.backends

import json
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict

import redis

from django_broadcaster.models import OutboxEvent

logger = logging.getLogger(__name__)


[docs] class PublisherBackend(ABC): """ Abstract base class for publisher backends """
[docs] @abstractmethod def publish(self, event: OutboxEvent) -> bool: """ Publish an event to the backend Args: event: The OutboxEvent to publish Returns: bool: True if successful, False otherwise Raises: Exception: If publishing fails """ pass
[docs] @abstractmethod def health_check(self) -> bool: """ Check if the backend is healthy and available Returns: bool: True if healthy, False otherwise """ pass
[docs] def get_name(self) -> str: """Get the backend name""" return self.__class__.__name__
[docs] class RedisStreamBackend(PublisherBackend): """ Redis Stream publisher backend """
[docs] def __init__(self, config: Dict[str, Any]): self.config = config self.redis_client = redis.Redis( host=config.get("host", "localhost"), port=config.get("port", 6379), db=config.get("db", 0), password=config.get("password"), decode_responses=True, socket_connect_timeout=config.get("connect_timeout", 5), socket_timeout=config.get("socket_timeout", 5), health_check_interval=30, ) self.stream_name = config.get("stream_name", "events") self.max_len = config.get("max_len", 10000)
[docs] def publish(self, event: OutboxEvent) -> bool: """ Publish event to Redis Stream """ try: cloud_event = event.to_cloud_event() # Add event metadata stream_data = { "event_id": str(event.id), "event_type": event.event_type, "source": event.source, "timestamp": event.created_at.isoformat(), "cloud_event": json.dumps(cloud_event), } # Add custom fields if configured if event.publisher_config: stream_data.update(event.publisher_config) # Publish to Redis Stream message_id = self.redis_client.xadd( self.stream_name, stream_data, maxlen=self.max_len, approximate=True ) logger.info( f"Published event {event.id} to Redis Stream with message ID: {message_id}" ) return True except Exception as e: logger.error( f"Failed to publish event {event.id} to Redis Stream: {str(e)}" ) raise
[docs] def health_check(self) -> bool: """ Check Redis connection health """ try: self.redis_client.ping() return True except Exception as e: logger.error(f"Redis health check failed: {str(e)}") return False
[docs] def get_name(self) -> str: return "RedisStream"