Allow closing streams via filename status marker#249
Conversation
Co-authored-by: albe <4259532+albe@users.noreply.github.com>
|
@copilot how would the current implementation behave in the unfortunate event of a stream being closed that is still being written to from another process. Though the code tries to avoid having two writers on one eventstore, it could still happen. |
Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Concurrent writer: The storage lock prevents two writable instances from being open simultaneously. If the lock is bypassed (e.g. via Concurrent reader: This was a real gap that I've now fixed (commit
The reader also now emits |
src/EventStore.js
Outdated
| const index = this.storage.openIndex(indexName); | ||
| if (isClosed) { | ||
| // Remove from secondary indexes so new writes are not indexed into a closed stream | ||
| delete this.storage.secondaryIndexes[indexName]; | ||
| } |
There was a problem hiding this comment.
Can we avoid this workaround by making the storage layer aware of "closed" indexes?
There was a problem hiding this comment.
Done in 9bd80d6. Added openClosedIndex(name) to ReadableStorage — it opens the .closed.index file directly without touching secondaryIndexes. The three places that previously did openIndex + delete secondaryIndexes[...] now use openClosedIndex instead.
|
@copilot take my last two comments into consideration and propose a cleaner attempt |
…x to storage layer Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Both points addressed in
|
src/EventStore.js
Outdated
| if ((matches = file.match(/(stream-.*\.closed)\.index$/)) !== null) { | ||
| this.registerStream(matches[1]); |
There was a problem hiding this comment.
Done in 28dfc64. The two-branch if/else is replaced by a single /(stream-.*)\.index$/ match — it naturally captures both stream-foo and stream-foo.closed (greedy .*), and registerStream already handles both cases from there.
Co-authored-by: albe <4259532+albe@users.noreply.github.com>
src/Storage/ReadableStorage.js
Outdated
| openClosedIndex(name) { | ||
| const indexName = this.storageFile + '.' + name + '.index'; | ||
| assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`); | ||
| const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions)); | ||
| index.open(); | ||
| return index; | ||
| } |
There was a problem hiding this comment.
@copilot maybe it makes sense to consider these indexes "readonly" in the context of the storage layer. You "close" a stream of events, but you don't close an index and keep reading from it. So I suggest renaming this to openReadonlyIndex(name) and keep track of the readonly indexes in the storage similar to the secondaryIndexes, e.g. as readonlyIndexes.
Then closing the storage will also close the readonly indexes and hence the EventStore close method can get rid of the "closed indexes" workaround code.
There was a problem hiding this comment.
Done in f6baf65. openClosedIndex is now openReadonlyIndex. ReadableStorage initialises a readonlyIndexes map, stores each opened index there (with a cache-hit path for repeated opens), and closes them all in close(). EventStore.close() no longer needs a manual teardown loop — it just delegates to this.storage.close().
…ge.readonlyIndexes Co-authored-by: albe <4259532+albe@users.noreply.github.com>
As stores accumulate time-boxed or process-scoped streams, every new write pays a matcher cost for all registered streams indefinitely. Closed streams need to stop receiving new index entries while remaining readable.
Approach
Uses a filename convention to persist stream status —
stream-X.closed.index— so no additional IO or binary format changes are needed. A directory scan at startup distinguishes open vs. closed streams without reading file contents.Changes
closeEventStream(streamName)— new public API; flushes and closes the index, renames the file tostream-X.closed.index, removes it fromstorage.secondaryIndexes, and reopens it read-only. Emits'stream-closed'.scanStreams— detects*.closed.indexfiles at startup and registers them as closed streams.registerStream— strips the.closedsuffix from stream names detected via file watcher; when an already-open stream's index file is renamed to.closed.indexby a concurrent writer, updates the stream entry in-place, reopens the index from the renamed file, and emits'stream-closed'on the reader so callers can react.commit— throws if the target stream is marked closed.close— explicitly closes closed-stream index handles (excluded from storage's secondary index lifecycle).Behavior
Closed status survives store restarts —
fiscal-2023will be recognized as closed when the store is reopened.Concurrent reader behaviour
When a writer closes a stream, any concurrently open read-only
EventStoreinstance in the same or another process will:ReadOnlyIndexfor the stream automatically closed viaonRename(triggered by the file system watcher).stream-X.closed.indexfile via the directory watcher, reopen the index from the renamed file, update the stream entry toclosed: true, and emit a'stream-closed'event — symmetric with the writer's own event.Concurrent writer: The existing storage lock prevents two writable instances from being open simultaneously, so a second writer encountering a mid-close rename is already a lock-violation scenario handled by the existing locking mechanism.
Original prompt
📱 Kick off Copilot coding agent tasks wherever you are with GitHub Mobile, available on iOS and Android.