Source code for django_broadcaster.registry

import logging
from typing import Any, Callable, Dict, List, Optional

from django.dispatch import Signal

from .models import OutboxEvent

logger = logging.getLogger(__name__)

# Signal dispatched when an event is created
event_created = Signal()

# Signal dispatched when an event is published
event_published = Signal()


[docs] class EventHandlerRegistry: """ Registry for event handlers that can process events locally """
[docs] def __init__(self): self._handlers: Dict[str, List[Callable]] = {} self._global_handlers: List[Callable] = []
[docs] def register(self, event_type: Optional[str] = None): """ Decorator to register an event handler Args: event_type: Specific event type to handle, None for all events """ def decorator(func: Callable): if event_type: if event_type not in self._handlers: self._handlers[event_type] = [] self._handlers[event_type].append(func) logger.info( f"Registered handler {func.__name__} for event type: {event_type}" ) else: self._global_handlers.append(func) logger.info(f"Registered global handler: {func.__name__}") return func return decorator
[docs] def handle_event(self, event: OutboxEvent) -> bool: """ Handle an event by calling registered handlers Args: event: The event to handle Returns: bool: True if all handlers succeeded, False otherwise """ success = True # Call specific handlers if event.event_type in self._handlers: for handler in self._handlers[event.event_type]: try: handler(event) logger.debug( f"Handler {handler.__name__} processed event {event.id}" ) except Exception as e: logger.error( f"Handler {handler.__name__} failed for event {event.id}: {str(e)}" ) success = False # Call global handlers for handler in self._global_handlers: try: handler(event) logger.debug( f"Global handler {handler.__name__} processed event {event.id}" ) except Exception as e: logger.error( f"Global handler {handler.__name__} failed for event {event.id}: {str(e)}" ) success = False return success
[docs] def get_handlers(self, event_type: Optional[str] = None) -> List[Callable]: """Get registered handlers for an event type""" if event_type: return self._handlers.get(event_type, []) + self._global_handlers return self._global_handlers
[docs] def clear_handlers(self, event_type: Optional[str] = None): """Clear handlers for an event type or all handlers""" if event_type: self._handlers.pop(event_type, None) else: self._handlers.clear() self._global_handlers.clear()
event_registry = EventHandlerRegistry()