Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions lib/event_store/notifications/listener.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
defmodule EventStore.Notifications.Listener do
@moduledoc false
@moduledoc """
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotation: Added some key tems like GenStage and full module names.

A GenStage producer which listens for event stream notifications from PostgreSQL using the
PostgreSQL `LISTEN/NOTIFY` mechanism and emits `EventStore.Notifications.Notification` events
for subscribed `EventStore.Notifications.Publisher` processes.

# Listener subscribes to event notifications using PostgreSQL's `LISTEN`
# command. Whenever events are appended to storage a `NOTIFY` command is
# executed by a trigger. The notification payload contains the first and last
# event number of the appended events. These events are then read from storage
# and published to interested subscribers.
The notification payload contains the first and last event number of the appended events.
"""

use GenStage

Expand Down
29 changes: 24 additions & 5 deletions lib/event_store/notifications/notification.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
defmodule EventStore.Notifications.Notification do
@moduledoc false
@moduledoc """
Represents a notification received from PostgreSQL's `LISTEN/NOTIFY` mechanism.

Parsed from a comma-separated payload containing the stream UUID, stream ID, and the first and
last stream versions of newly appended events.

## Examples:

iex> EventStore.Notifications.Notification.new("stream-12345,1,2,5")
%EventStore.Notifications.Notification{
stream_uuid: "stream-12345",
stream_id: 1,
from_stream_version: 2,
to_stream_version: 5
}
"""

alias EventStore.Notifications.Notification

defstruct [:stream_uuid, :stream_id, :from_stream_version, :to_stream_version]

@doc """
Build a new notification struct from the `NOTIFY` payload which contains the
stream uuid, stream id, first and last stream versions.
Parses the PostgreSQL `NOTIFY` payload and builds a new struct.

## Example

Notification.new("stream-12345,1,1,5")

iex> EventStore.Notifications.Notification.new("stream-12345,1,1,5")
%EventStore.Notifications.Notification{
stream_uuid: "stream-12345",
stream_id: 1,
from_stream_version: 1,
to_stream_version: 5
}
"""
def new(payload) do
[last, first, stream_id, stream_uuid] =
Expand Down
12 changes: 7 additions & 5 deletions lib/event_store/notifications/publisher.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
defmodule EventStore.Notifications.Publisher do
@moduledoc false

# Reads events from storage by each event number range received and publishes
# them.
@moduledoc """
A GenStage consumer which subscribes to a `EventStore.Notifications.Listener` and receives
`EventStore.Notifications.Notification` events, reading the corresponding range of events from
storage, deserializing them, and broadcasting them to PubSub subscribers for the relevant storage
stream.
"""

use GenStage

Expand Down Expand Up @@ -43,7 +45,7 @@ defmodule EventStore.Notifications.Publisher do
{:consumer, state, [subscribe_to: [{subscribe_to, max_demand: 1}]]}
end

# Fetch events from storage and pass onwards to subscibers
# Fetch events from storage and pass onwards to subscribers
def handle_events(events, _from, state) do
%State{event_store: event_store} = state

Expand Down
4 changes: 4 additions & 0 deletions test/notifications/notification_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
defmodule EventStore.Notifications.NotificationTest do
use ExUnit.Case, async: true
doctest EventStore.Notifications.Notification
end
Comment on lines +1 to +4
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotation: I didn't see any unit tests for this module so I added the doctest to have something in place.

Loading