diff --git a/lib/event_store/notifications/listener.ex b/lib/event_store/notifications/listener.ex index bb92e8b2..cfaea7dd 100644 --- a/lib/event_store/notifications/listener.ex +++ b/lib/event_store/notifications/listener.ex @@ -1,11 +1,11 @@ defmodule EventStore.Notifications.Listener do - @moduledoc false + @moduledoc """ + A GenStage producer which listens for event stream notifications from PostgreSQL using the + PostgreSQL `LISTEN/NOTIFY` mechanism and emits `EventStore.Notifications.Notification` events + for subscribed `EventStore.Notifications.Publisher` processes. - # Listener subscribes to event notifications using PostgreSQL's `LISTEN` - # command. Whenever events are appended to storage a `NOTIFY` command is - # executed by a trigger. The notification payload contains the first and last - # event number of the appended events. These events are then read from storage - # and published to interested subscribers. + The notification payload contains the first and last event number of the appended events. + """ use GenStage diff --git a/lib/event_store/notifications/notification.ex b/lib/event_store/notifications/notification.ex index f25b8730..14eefd1c 100644 --- a/lib/event_store/notifications/notification.ex +++ b/lib/event_store/notifications/notification.ex @@ -1,18 +1,37 @@ defmodule EventStore.Notifications.Notification do - @moduledoc false + @moduledoc """ + Represents a notification received from PostgreSQL's `LISTEN/NOTIFY` mechanism. + + Parsed from a comma-separated payload containing the stream UUID, stream ID, and the first and + last stream versions of newly appended events. + + ## Examples: + + iex> EventStore.Notifications.Notification.new("stream-12345,1,2,5") + %EventStore.Notifications.Notification{ + stream_uuid: "stream-12345", + stream_id: 1, + from_stream_version: 2, + to_stream_version: 5 + } + """ alias EventStore.Notifications.Notification defstruct [:stream_uuid, :stream_id, :from_stream_version, :to_stream_version] @doc """ - Build a new notification struct from the `NOTIFY` payload which contains the - stream uuid, stream id, first and last stream versions. + Parses the PostgreSQL `NOTIFY` payload and builds a new struct. ## Example - Notification.new("stream-12345,1,1,5") - + iex> EventStore.Notifications.Notification.new("stream-12345,1,1,5") + %EventStore.Notifications.Notification{ + stream_uuid: "stream-12345", + stream_id: 1, + from_stream_version: 1, + to_stream_version: 5 + } """ def new(payload) do [last, first, stream_id, stream_uuid] = diff --git a/lib/event_store/notifications/publisher.ex b/lib/event_store/notifications/publisher.ex index 4d9e65f1..28fb1f7b 100644 --- a/lib/event_store/notifications/publisher.ex +++ b/lib/event_store/notifications/publisher.ex @@ -1,8 +1,10 @@ defmodule EventStore.Notifications.Publisher do - @moduledoc false - - # Reads events from storage by each event number range received and publishes - # them. + @moduledoc """ + A GenStage consumer which subscribes to a `EventStore.Notifications.Listener` and receives + `EventStore.Notifications.Notification` events, reading the corresponding range of events from + storage, deserializing them, and broadcasting them to PubSub subscribers for the relevant storage + stream. + """ use GenStage @@ -43,7 +45,7 @@ defmodule EventStore.Notifications.Publisher do {:consumer, state, [subscribe_to: [{subscribe_to, max_demand: 1}]]} end - # Fetch events from storage and pass onwards to subscibers + # Fetch events from storage and pass onwards to subscribers def handle_events(events, _from, state) do %State{event_store: event_store} = state diff --git a/test/notifications/notification_test.exs b/test/notifications/notification_test.exs new file mode 100644 index 00000000..0954fa53 --- /dev/null +++ b/test/notifications/notification_test.exs @@ -0,0 +1,4 @@ +defmodule EventStore.Notifications.NotificationTest do + use ExUnit.Case, async: true + doctest EventStore.Notifications.Notification +end