Source code for django_broadcaster.publishers

import logging
from datetime import datetime
from typing import Any, Dict, Optional

from django.conf import settings
from django.db import transaction
from django.utils import timezone

from .backends import PublisherBackend, RedisStreamBackend
from .events import CloudEvent
from .models import OutboxEvent, OutboxEventStatus

logger = logging.getLogger(__name__)


[docs] class OutboxPublisher: """ Main publisher class for the outbox pattern """
[docs] def __init__(self): self._backends: Dict[str, PublisherBackend] = {} self._load_backends()
[docs] def _load_backends(self): """Load configured publisher backends""" outbox_config = getattr(settings, "OUTBOX_PUBLISHERS", {}) for name, config in outbox_config.items(): backend_class = config.get("BACKEND") backend_options = config.get("OPTIONS", {}) if backend_class == "django_broadcaster.backends.RedisStreamBackend": self._backends[name] = RedisStreamBackend(backend_options) else: logger.warning(f"Unknown backend class: {backend_class}")
[docs] def publish_event( self, event_type: str, source: str, data: Optional[Any] = None, subject: str = "", backend: Optional[str] = None, scheduled_at: Optional[datetime] = None, max_retries: int = 3, publisher_config: Optional[Dict[str, Any]] = None, ) -> OutboxEvent: """ Publish an event to the outbox Args: event_type: Type of the event source: Source of the event data: Event payload data subject: Event subject (optional) backend: Publisher backend to use (optional) scheduled_at: When to publish the event (default: now) max_retries: Maximum retry attempts publisher_config: Backend-specific configuration Returns: OutboxEvent: The created event """ with transaction.atomic(): event = OutboxEvent.objects.create( event_type=event_type, source=source, subject=subject, data=data, scheduled_at=scheduled_at or timezone.now(), max_retries=max_retries, publisher_backend=backend or "", publisher_config=publisher_config or {}, ) logger.info(f"Created outbox event: {event.id}") return event
[docs] def publish_cloud_event( self, cloud_event: CloudEvent, backend: Optional[str] = None, scheduled_at: Optional[datetime] = None, max_retries: int = 3, publisher_config: Optional[Dict[str, Any]] = None, ) -> OutboxEvent: """ Publish a CloudEvent to the outbox """ return self.publish_event( event_type=cloud_event.event_type, source=cloud_event.source, data=cloud_event.data, subject=cloud_event.subject, backend=backend, scheduled_at=scheduled_at, max_retries=max_retries, publisher_config=publisher_config, )
[docs] def process_pending_events(self, batch_size: int = 100) -> int: """ Process pending events from the outbox Args: batch_size: Number of events to process in one batch Returns: int: Number of events processed """ processed_count = 0 # Get pending events that are ready to be published pending_events = OutboxEvent.objects.filter( status__in=[OutboxEventStatus.PENDING, OutboxEventStatus.RETRY], scheduled_at__lte=timezone.now(), ).order_by("created_at")[:batch_size] for event in pending_events: if self._process_single_event(event): processed_count += 1 return processed_count
[docs] def _process_single_event(self, event: OutboxEvent) -> bool: """ Process a single event Args: event: The event to process Returns: bool: True if successfully processed, False otherwise """ try: # Mark as processing to prevent concurrent processing with transaction.atomic(): event_to_update = OutboxEvent.objects.select_for_update().get( id=event.id ) if event_to_update.status == OutboxEventStatus.PROCESSING: return False # Already being processed event_to_update.status = OutboxEventStatus.PROCESSING event_to_update.save(update_fields=["status", "updated_at"]) event = event_to_update # Determine which backend to use backend_name = event.publisher_backend or self._get_default_backend() if not backend_name or backend_name not in self._backends: raise ValueError(f"No valid backend found: {backend_name}") backend = self._backends[backend_name] # Check backend health if not backend.health_check(): raise Exception(f"Backend {backend_name} is not healthy") # Publish the event success = backend.publish(event) if success: event.mark_as_published() logger.info(f"Successfully published event {event.id}") return True else: raise Exception("Backend publish returned False") except Exception as e: error_msg = str(e) logger.error(f"Failed to process event {event.id}: {error_msg}") event.increment_retry(error_msg) return False
[docs] def _get_default_backend(self) -> Optional[str]: """Get the default backend name""" if self._backends: return next(iter(self._backends.keys())) return None
[docs] def get_backend_health(self) -> Dict[str, bool]: """Get health status of all backends""" return { name: backend.health_check() for name, backend in self._backends.items() }
publisher = OutboxPublisher()