Skip to content

Allow closing streams via filename status marker#249

Merged
albe merged 6 commits intomainfrom
copilot/allow-close-streams
Mar 20, 2026
Merged

Allow closing streams via filename status marker#249
albe merged 6 commits intomainfrom
copilot/allow-close-streams

Conversation

Copy link
Contributor

Copilot AI commented Mar 19, 2026

As stores accumulate time-boxed or process-scoped streams, every new write pays a matcher cost for all registered streams indefinitely. Closed streams need to stop receiving new index entries while remaining readable.

Approach

Uses a filename convention to persist stream status — stream-X.closed.index — so no additional IO or binary format changes are needed. A directory scan at startup distinguishes open vs. closed streams without reading file contents.

Changes

  • closeEventStream(streamName) — new public API; flushes and closes the index, renames the file to stream-X.closed.index, removes it from storage.secondaryIndexes, and reopens it read-only. Emits 'stream-closed'.
  • scanStreams — detects *.closed.index files at startup and registers them as closed streams.
  • registerStream — strips the .closed suffix from stream names detected via file watcher; when an already-open stream's index file is renamed to .closed.index by a concurrent writer, updates the stream entry in-place, reopens the index from the renamed file, and emits 'stream-closed' on the reader so callers can react.
  • commit — throws if the target stream is marked closed.
  • close — explicitly closes closed-stream index handles (excluded from storage's secondary index lifecycle).

Behavior

eventstore.commit('fiscal-2023', [event], () => {
    eventstore.closeEventStream('fiscal-2023');

    // Still readable
    const stream = eventstore.getEventStream('fiscal-2023'); // works

    // Write rejected
    eventstore.commit('fiscal-2023', [event]); // throws

    // Recreation blocked
    eventstore.createEventStream('fiscal-2023', matcher); // throws
});

Closed status survives store restarts — fiscal-2023 will be recognized as closed when the store is reopened.

Concurrent reader behaviour

When a writer closes a stream, any concurrently open read-only EventStore instance in the same or another process will:

  1. Have its ReadOnlyIndex for the stream automatically closed via onRename (triggered by the file system watcher).
  2. Detect the new stream-X.closed.index file via the directory watcher, reopen the index from the renamed file, update the stream entry to closed: true, and emit a 'stream-closed' event — symmetric with the writer's own event.

Concurrent writer: The existing storage lock prevents two writable instances from being open simultaneously, so a second writer encountering a mid-close rename is already a lock-violation scenario handled by the existing locking mechanism.

Original prompt

This section details on the original issue you should resolve

<issue_title>Allow to "close" streams</issue_title>
<issue_description>Right now, any index/stream defined will live indefinitely and have every future event/document be checked against it's matcher, even if the stream is known to longer match any future events. The more streams get created, the slower the whole system will become. Some examples are time boxed streams, like by fiscal year/calendar week/etc. or by correlationId (processes).

Therefore it would be useful if streams could be marked "closed"/"finished", or maybe even "deleted" (to prevent them from being recreated).

One approach could be to define a specific combination of index entry values (sequence number, file position, data size, partition id), e.g. (0, 0, 0, 0) for this marker and check for that as last entry in the index when opening. Any such index should no longer be considered for checking new documents.</issue_description>

<agent_instructions>start working on this task with the suggested solution from the last comment, with a filename pattern indicating the status of the stream.</agent_instructions>

Comments on the Issue (you are @copilot in this section)

@albe Another idea is to maybe put the "closed" (and/or other) marker information into the filename, so that a scan of all streams can filter out closed streams early on without having to do additional IO on every file.

stream-X.closed.index, stream-X.deleted.index, etc.

The interesting question is, how those streams should behave on typical operations. Reading from a closed stream should work as normal likely, but it should not be registered for indexing new events and trying to append to it should throw.</comment_new>


📱 Kick off Copilot coding agent tasks wherever you are with GitHub Mobile, available on iOS and Android.

Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Copilot AI changed the title [WIP] Add ability to mark streams as closed or finished Allow closing streams via filename status marker Mar 19, 2026
Copilot AI requested a review from albe March 19, 2026 17:44
Copilot finished work on behalf of albe March 19, 2026 17:44
@coveralls
Copy link

coveralls commented Mar 19, 2026

Coverage Status

coverage: 97.826% (-0.05%) from 97.875%
when pulling f6baf65 on copilot/allow-close-streams
into 217076a on main.

@albe
Copy link
Owner

albe commented Mar 19, 2026

@copilot how would the current implementation behave in the unfortunate event of a stream being closed that is still being written to from another process. Though the code tries to avoid having two writers on one eventstore, it could still happen.
Also, how does it behave for other reader instances having the stream still opened?

Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Copy link
Contributor Author

Copilot AI commented Mar 19, 2026

@copilot how would the current implementation behave in the unfortunate event of a stream being closed that is still being written to from another process. Though the code tries to a...

Concurrent writer: The storage lock prevents two writable instances from being open simultaneously. If the lock is bypassed (e.g. via LOCK_RECLAIM), both processes are already in an undefined state — this is the same risk that exists throughout the system and no additional defense was added.

Concurrent reader: This was a real gap that I've now fixed (commit 9d48ea1). When the writer renames stream-X.indexstream-X.closed.index, the reader's ReadOnlyIndex.onRename() fires and closes the old index. The reader's directory watcher then emits 'index-created' with 'stream-X.closed'. Previously, registerStream would extract streamName = 'X.closed' (wrong) and create a spurious entry. Now it:

  1. Strips the .closed suffix to get the real stream name
  2. If the stream is already registered as open, reopens it from the renamed file and updates the entry to closed: true
  3. Emits 'stream-closed' on the reader so callers can react

The reader also now emits 'stream-closed' when it detects a rename mid-session, symmetric with the writer's own 'stream-closed' event.

Copilot finished work on behalf of albe March 19, 2026 18:09
Comment on lines +137 to +141
const index = this.storage.openIndex(indexName);
if (isClosed) {
// Remove from secondary indexes so new writes are not indexed into a closed stream
delete this.storage.secondaryIndexes[indexName];
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this workaround by making the storage layer aware of "closed" indexes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9bd80d6. Added openClosedIndex(name) to ReadableStorage — it opens the .closed.index file directly without touching secondaryIndexes. The three places that previously did openIndex + delete secondaryIndexes[...] now use openClosedIndex instead.

@albe
Copy link
Owner

albe commented Mar 20, 2026

@copilot take my last two comments into consideration and propose a cleaner attempt

…x to storage layer

Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Copy link
Contributor Author

Copilot AI commented Mar 20, 2026

@copilot take my last two comments into consideration and propose a cleaner attempt

Both points addressed in 9bd80d6:

  • registerStream no longer has a closed parameter — it determines closed state purely from the .closed suffix in the index name, which is consistent in all call paths (initial scan, file-watcher notification, and concurrent-close transition).
  • Storage layer now has openClosedIndex(name) on ReadableStorage that opens a .closed.index file without registering it in secondaryIndexes, eliminating the open-then-delete hack from all three former sites.

Copilot AI requested a review from albe March 20, 2026 08:43
Copilot finished work on behalf of albe March 20, 2026 08:43
Comment on lines +93 to +94
if ((matches = file.match(/(stream-.*\.closed)\.index$/)) !== null) {
this.registerStream(matches[1]);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot get rid of this obsolete regex match now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 28dfc64. The two-branch if/else is replaced by a single /(stream-.*)\.index$/ match — it naturally captures both stream-foo and stream-foo.closed (greedy .*), and registerStream already handles both cases from there.

Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Copilot AI requested a review from albe March 20, 2026 09:10
Copilot finished work on behalf of albe March 20, 2026 09:10
Comment on lines +290 to +296
openClosedIndex(name) {
const indexName = this.storageFile + '.' + name + '.index';
assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions));
index.open();
return index;
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot maybe it makes sense to consider these indexes "readonly" in the context of the storage layer. You "close" a stream of events, but you don't close an index and keep reading from it. So I suggest renaming this to openReadonlyIndex(name) and keep track of the readonly indexes in the storage similar to the secondaryIndexes, e.g. as readonlyIndexes.
Then closing the storage will also close the readonly indexes and hence the EventStore close method can get rid of the "closed indexes" workaround code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in f6baf65. openClosedIndex is now openReadonlyIndex. ReadableStorage initialises a readonlyIndexes map, stores each opened index there (with a cache-hit path for repeated opens), and closes them all in close(). EventStore.close() no longer needs a manual teardown loop — it just delegates to this.storage.close().

…ge.readonlyIndexes

Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Copilot AI requested a review from albe March 20, 2026 09:27
Copilot finished work on behalf of albe March 20, 2026 09:27
@albe albe marked this pull request as ready for review March 20, 2026 14:25
@albe albe merged commit 28d4e34 into main Mar 20, 2026
10 checks passed
@albe albe deleted the copilot/allow-close-streams branch March 20, 2026 14:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow to "close" streams

3 participants