Source code for django_broadcaster.models

import uuid
from typing import Any, Dict

from django.db import models
from django.utils import timezone


[docs] class OutboxEventStatus(models.TextChoices): PENDING = "pending", "Pending" PROCESSING = "processing", "Processing" PUBLISHED = "published", "Published" FAILED = "failed", "Failed" RETRY = "retry", "Retry"
[docs] class OutboxEvent(models.Model): """ Stores events to be published following the outbox pattern. Compatible with CloudEvents specification. """ id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) # CloudEvents required attributes spec_version = models.CharField(max_length=10, default="1.0") event_type = models.CharField(max_length=255, db_index=True) source = models.CharField(max_length=255) subject = models.CharField(max_length=255, blank=True) # CloudEvents optional attributes data_content_type = models.CharField(max_length=100, default="application/json") data_schema = models.URLField(blank=True) # Event data (serialized JSON) data = models.JSONField(null=True, blank=True) # Outbox specific fields status = models.CharField( max_length=20, choices=OutboxEventStatus.choices, default=OutboxEventStatus.PENDING, db_index=True, ) # Timing fields created_at = models.DateTimeField(auto_now_add=True, db_index=True) updated_at = models.DateTimeField(auto_now=True) scheduled_at = models.DateTimeField(default=timezone.now, db_index=True) published_at = models.DateTimeField(null=True, blank=True) # Retry and error handling retry_count = models.PositiveIntegerField(default=0) max_retries = models.PositiveIntegerField(default=3) last_error = models.TextField(blank=True) # Publisher configuration publisher_backend = models.CharField(max_length=100, blank=True) publisher_config = models.JSONField(default=dict, blank=True) class Meta: db_table = "outbox_events" indexes = [ models.Index(fields=["status", "scheduled_at"]), models.Index(fields=["event_type", "created_at"]), ]
[docs] def __str__(self): return f"{self.event_type} - {self.id}"
[docs] def to_cloud_event(self) -> Dict[str, Any]: """Convert to CloudEvents format""" event = { "specversion": self.spec_version, "type": self.event_type, "source": self.source, "id": str(self.id), "time": self.created_at.isoformat(), } if self.subject: event["subject"] = self.subject if self.data_content_type: event["datacontenttype"] = self.data_content_type if self.data_schema: event["dataschema"] = self.data_schema if self.data is not None: event["data"] = self.data return event
[docs] def mark_as_published(self): """Mark event as successfully published""" self.status = OutboxEventStatus.PUBLISHED self.published_at = timezone.now() self.save(update_fields=["status", "published_at", "updated_at"])
[docs] def mark_as_failed(self, error_message: str): """Mark event as failed with error details""" self.status = OutboxEventStatus.FAILED self.last_error = error_message self.save(update_fields=["status", "last_error", "updated_at"])
[docs] def increment_retry(self, error_message: str = ""): """Increment retry count and handle retry logic""" self.retry_count += 1 if error_message: self.last_error = error_message if self.retry_count >= self.max_retries: self.status = OutboxEventStatus.FAILED else: self.status = OutboxEventStatus.RETRY # Exponential backoff: 2^retry_count minutes delay_minutes = 2**self.retry_count self.scheduled_at = timezone.now() + timezone.timedelta( minutes=delay_minutes ) self.save( update_fields=[ "retry_count", "last_error", "status", "scheduled_at", "updated_at", ] )