Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ will) break with every update.
This SDK current supports the following versions of CloudEvents:

- v1.0
- v0.3

## Python SDK

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ exclude = [
[tool.ruff.lint]
ignore = ["E731"]
extend-ignore = ["E203"]
select = ["I"]
select = [
"I", # isort - import sorting
"F401", # unused imports
]


[tool.pytest.ini_options]
Expand Down
4 changes: 4 additions & 0 deletions src/cloudevents/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@
# under the License.

"""This package contains the core functionality of the CloudEvents spec."""

# CloudEvents specification version constants
SPECVERSION_V1_0 = "1.0"
SPECVERSION_V0_3 = "0.3"
31 changes: 26 additions & 5 deletions src/cloudevents/core/bindings/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

from dateutil.parser import isoparse

from cloudevents.core import SPECVERSION_V1_0
from cloudevents.core.base import BaseCloudEvent, EventFactory
from cloudevents.core.bindings.common import get_event_factory_for_version
from cloudevents.core.formats.base import Format

# AMQP CloudEvents spec allows both cloudEvents_ and cloudEvents: prefixes
Expand Down Expand Up @@ -149,11 +151,14 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage:
def from_binary(
message: AMQPMessage,
event_format: Format,
event_factory: EventFactory,
event_factory: EventFactory | None = None,
) -> BaseCloudEvent:
"""
Parse an AMQP binary content mode message to a CloudEvent.

Auto-detects the CloudEvents version from the application properties
and uses the appropriate event factory if not explicitly provided.

Extracts CloudEvent attributes from AMQP application properties with either
'cloudEvents_' or 'cloudEvents:' prefix (per AMQP CloudEvents spec), and treats
the AMQP 'content-type' property as the 'datacontenttype' attribute. The
Expand Down Expand Up @@ -200,6 +205,11 @@ def from_binary(
if CONTENT_TYPE_PROPERTY in message.properties:
attributes["datacontenttype"] = message.properties[CONTENT_TYPE_PROPERTY]

# Auto-detect version if factory not provided
if event_factory is None:
specversion = attributes.get("specversion", SPECVERSION_V1_0)
event_factory = get_event_factory_for_version(specversion)

datacontenttype = attributes.get("datacontenttype")
data = event_format.read_data(message.application_data, datacontenttype)

Expand Down Expand Up @@ -248,7 +258,7 @@ def to_structured(event: BaseCloudEvent, event_format: Format) -> AMQPMessage:
def from_structured(
message: AMQPMessage,
event_format: Format,
event_factory: EventFactory,
event_factory: EventFactory | None = None,
) -> BaseCloudEvent:
"""
Parse an AMQP structured content mode message to a CloudEvent.
Expand All @@ -257,33 +267,44 @@ def from_structured(
specified format. Any cloudEvents_-prefixed application properties are ignored
as the application-data contains all event metadata.

If event_factory is not provided, version detection is delegated to the format
implementation, which will auto-detect based on the 'specversion' field.

Example:
>>> from cloudevents.core.v1.event import CloudEvent
>>> from cloudevents.core.formats.json import JSONFormat
>>>
>>> # Explicit factory
>>> message = AMQPMessage(
... properties={"content-type": "application/cloudevents+json"},
... application_properties={},
... application_data=b'{"type": "com.example.test", "source": "/test", ...}'
... )
>>> event = from_structured(message, JSONFormat(), CloudEvent)
>>>
>>> # Auto-detect version
>>> event = from_structured(message, JSONFormat())

:param message: AMQPMessage to parse
:param event_format: Format implementation for deserialization
:param event_factory: Factory function to create CloudEvent instances
:param event_factory: Factory function to create CloudEvent instances.
If None, the format will auto-detect the version.
:return: CloudEvent instance
"""
# Delegate version detection to format layer
return event_format.read(event_factory, message.application_data)


def from_amqp(
message: AMQPMessage,
event_format: Format,
event_factory: EventFactory,
event_factory: EventFactory | None = None,
) -> BaseCloudEvent:
"""
Parse an AMQP message to a CloudEvent with automatic mode detection.

Auto-detects CloudEvents version and uses appropriate event factory if not provided.

Automatically detects whether the message uses binary or structured content mode:
- If content-type starts with "application/cloudevents" → structured mode
- Otherwise → binary mode
Expand Down Expand Up @@ -313,7 +334,7 @@ def from_amqp(

:param message: AMQPMessage to parse
:param event_format: Format implementation for deserialization
:param event_factory: Factory function to create CloudEvent instances
:param event_factory: Factory function to create CloudEvent instances (auto-detected if None)
:return: CloudEvent instance
"""
content_type = message.properties.get(CONTENT_TYPE_PROPERTY, "")
Expand Down
21 changes: 21 additions & 0 deletions src/cloudevents/core/bindings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@

from dateutil.parser import isoparse

from cloudevents.core import SPECVERSION_V0_3
from cloudevents.core.base import EventFactory
from cloudevents.core.v03.event import CloudEvent as CloudEventV03
from cloudevents.core.v1.event import CloudEvent

TIME_ATTR: Final[str] = "time"
CONTENT_TYPE_HEADER: Final[str] = "content-type"
DATACONTENTTYPE_ATTR: Final[str] = "datacontenttype"
Expand Down Expand Up @@ -66,3 +71,19 @@ def decode_header_value(attr_name: str, value: str) -> Any:
return isoparse(decoded)

return decoded


def get_event_factory_for_version(specversion: str) -> EventFactory:
"""
Get the appropriate event factory based on the CloudEvents specification version.

This function returns the CloudEvent class implementation for the specified
version. Used by protocol bindings for automatic version detection.

:param specversion: The CloudEvents specification version (e.g., "0.3" or "1.0")
:return: EventFactory for the specified version (defaults to v1.0 for unknown versions)
"""
if specversion == SPECVERSION_V0_3:
return CloudEventV03
# Default to v1.0 for unknown versions
return CloudEvent
64 changes: 44 additions & 20 deletions src/cloudevents/core/bindings/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
from dataclasses import dataclass
from typing import Any, Final

from cloudevents.core import SPECVERSION_V1_0
from cloudevents.core.base import BaseCloudEvent, EventFactory
from cloudevents.core.bindings.common import (
CONTENT_TYPE_HEADER,
DATACONTENTTYPE_ATTR,
decode_header_value,
encode_header_value,
get_event_factory_for_version,
)
from cloudevents.core.formats.base import Format
from cloudevents.core.formats.json import JSONFormat
from cloudevents.core.v1.event import CloudEvent

CE_PREFIX: Final[str] = "ce-"

Expand Down Expand Up @@ -94,11 +95,14 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
def from_binary(
message: HTTPMessage,
event_format: Format,
event_factory: EventFactory,
event_factory: EventFactory | None = None,
) -> BaseCloudEvent:
"""
Parse an HTTP binary content mode message to a CloudEvent.

Auto-detects the CloudEvents version from the 'ce-specversion' header
and uses the appropriate event factory if not explicitly provided.

Extracts CloudEvent attributes from ce-prefixed HTTP headers and treats the
'Content-Type' header as the 'datacontenttype' attribute. The HTTP body is
parsed as event data according to the content type.
Expand All @@ -116,7 +120,7 @@ def from_binary(

:param message: HTTPMessage to parse
:param event_format: Format implementation for data deserialization
:param event_factory: Factory function to create CloudEvent instances
:param event_factory: Factory function to create CloudEvent instances (auto-detected if None)
:return: CloudEvent instance
"""
attributes: dict[str, Any] = {}
Expand All @@ -130,6 +134,11 @@ def from_binary(
elif normalized_name == CONTENT_TYPE_HEADER:
attributes[DATACONTENTTYPE_ATTR] = header_value

# Auto-detect version if factory not provided
if event_factory is None:
specversion = attributes.get("specversion", SPECVERSION_V1_0)
event_factory = get_event_factory_for_version(specversion)

datacontenttype = attributes.get(DATACONTENTTYPE_ATTR)
data = event_format.read_data(message.body, datacontenttype)

Expand Down Expand Up @@ -172,40 +181,51 @@ def to_structured(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
def from_structured(
message: HTTPMessage,
event_format: Format,
event_factory: EventFactory,
event_factory: EventFactory | None = None,
) -> BaseCloudEvent:
"""
Parse an HTTP structured content mode message to a CloudEvent.

Deserializes the CloudEvent from the HTTP body using the specified format.
Any ce-prefixed headers are ignored as the body contains all event metadata.

If event_factory is not provided, version detection is delegated to the format
implementation, which will auto-detect based on the 'specversion' field.

Example:
>>> from cloudevents.core.v1.event import CloudEvent
>>> from cloudevents.core.formats.json import JSONFormat
>>>
>>> # Explicit factory (recommended for performance)
>>> message = HTTPMessage(
... headers={"content-type": "application/cloudevents+json"},
... body=b'{"type": "com.example.test", "source": "/test", ...}'
... )
>>> event = from_structured(message, JSONFormat(), CloudEvent)
>>>
>>> # Auto-detect version (convenient)
>>> event = from_structured(message, JSONFormat())

:param message: HTTPMessage to parse
:param event_format: Format implementation for deserialization
:param event_factory: Factory function to create CloudEvent instances
:param event_factory: Factory function to create CloudEvent instances.
If None, the format will auto-detect the version.
:return: CloudEvent instance
"""
# Delegate version detection to format layer
return event_format.read(event_factory, message.body)


def from_http(
message: HTTPMessage,
event_format: Format,
event_factory: EventFactory,
event_factory: EventFactory | None = None,
) -> BaseCloudEvent:
"""
Parse an HTTP message to a CloudEvent with automatic mode detection.

Auto-detects CloudEvents version and uses appropriate event factory if not provided.

Automatically detects whether the message uses binary or structured content mode:
- If any ce- prefixed headers are present → binary mode
- Otherwise → structured mode
Expand Down Expand Up @@ -233,7 +253,7 @@ def from_http(

:param message: HTTPMessage to parse
:param event_format: Format implementation for deserialization
:param event_factory: Factory function to create CloudEvent instances
:param event_factory: Factory function to create CloudEvent instances (auto-detected if None)
:return: CloudEvent instance
"""
if any(key.lower().startswith(CE_PREFIX) for key in message.headers.keys()):
Expand Down Expand Up @@ -271,21 +291,23 @@ def to_binary_event(
def from_binary_event(
message: HTTPMessage,
event_format: Format | None = None,
) -> CloudEvent:
) -> BaseCloudEvent:
"""
Convenience wrapper for from_binary with JSON format and CloudEvent as defaults.
Convenience wrapper for from_binary with JSON format and auto-detection.

Auto-detects CloudEvents version (v0.3 or v1.0) from headers.

Example:
>>> from cloudevents.core.bindings import http
>>> event = http.from_binary_event(message)

:param message: HTTPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
:return: CloudEvent instance (v0.3 or v1.0 based on specversion)
"""
if event_format is None:
event_format = JSONFormat()
return from_binary(message, event_format, CloudEvent)
return from_binary(message, event_format, None)


def to_structured_event(
Expand Down Expand Up @@ -317,39 +339,41 @@ def to_structured_event(
def from_structured_event(
message: HTTPMessage,
event_format: Format | None = None,
) -> CloudEvent:
) -> BaseCloudEvent:
"""
Convenience wrapper for from_structured with JSON format and CloudEvent as defaults.
Convenience wrapper for from_structured with JSON format and auto-detection.

Auto-detects CloudEvents version (v0.3 or v1.0) from body.

Example:
>>> from cloudevents.core.bindings import http
>>> event = http.from_structured_event(message)

:param message: HTTPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
:return: CloudEvent instance (v0.3 or v1.0 based on specversion)
"""
if event_format is None:
event_format = JSONFormat()
return from_structured(message, event_format, CloudEvent)
return from_structured(message, event_format, None)


def from_http_event(
message: HTTPMessage,
event_format: Format | None = None,
) -> CloudEvent:
) -> BaseCloudEvent:
"""
Convenience wrapper for from_http with JSON format and CloudEvent as defaults.
Auto-detects binary or structured mode.
Convenience wrapper for from_http with JSON format and auto-detection.
Auto-detects binary or structured mode, and CloudEvents version.

Example:
>>> from cloudevents.core.bindings import http
>>> event = http.from_http_event(message)

:param message: HTTPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
:return: CloudEvent instance (v0.3 or v1.0 based on specversion)
"""
if event_format is None:
event_format = JSONFormat()
return from_http(message, event_format, CloudEvent)
return from_http(message, event_format, None)
Loading