Source code for django_broadcaster.events
from typing import Any, Dict, Optional
[docs]
class CloudEvent:
"""
CloudEvent interface compatible with CloudEvents specification
"""
[docs]
def __init__(
self,
event_type: str,
source: str,
data: Optional[Any] = None,
subject: str = "",
data_content_type: str = "application/json",
data_schema: str = "",
spec_version: str = "1.0",
):
self.spec_version = spec_version
self.event_type = event_type
self.source = source
self.subject = subject
self.data_content_type = data_content_type
self.data_schema = data_schema
self.data = data
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation"""
event_dict = {
"specversion": self.spec_version,
"type": self.event_type,
"source": self.source,
}
if self.subject:
event_dict["subject"] = self.subject
if self.data_content_type:
event_dict["datacontenttype"] = self.data_content_type
if self.data_schema:
event_dict["dataschema"] = self.data_schema
if self.data is not None:
event_dict["data"] = self.data
return event_dict