Skip to content

feat: Single-pass WAL streaming for LOG_BASED replication#772

Open
bdewilde wants to merge 22 commits into
MeltanoLabs:mainfrom
bdewilde:single-pass-wal-streaming
Open

feat: Single-pass WAL streaming for LOG_BASED replication#772
bdewilde wants to merge 22 commits into
MeltanoLabs:mainfrom
bdewilde:single-pass-wal-streaming

Conversation

@bdewilde
Copy link
Copy Markdown

@bdewilde bdewilde commented Apr 27, 2026

problem

PostgresLogBasedStream.get_records() opens its own LogicalReplicationConnection per selected stream. With N LOG_BASED streams the tap runs N sequential WAL scans -- each rereads the same segments, with add-tables discarding most records server-side. End-to-end sync time scales ~linearly in N. For pipelines with multiple LOG_BASED streams against a large backlog, this dominates run-time.

changes

A new SingleConnectionWALReader opens one logical replication connection with add-tables covering all selected LOG_BASED tables, scans the WAL once, and dispatches each parsed wal2json message inline to the owning stream's new emit_record() method for immediate Singer RECORD emission. STATE flushes every 30s, and the slot is advanced to the WAL tip on idle/max-run exit.

  • new modules: _wal_helpers.py (FQN/escaping/parsing helpers) and wal_reader.py (the reader and read loop).
  • client.py gains emit_record method and a config-flag branch in get_records
  • tap.py adds the log_based_single_connection config setting (default False!) and _sync_log_based_streams_shared orchestration
  • new tests: tests/test_wal_helpers.py, tests/test_wal_reader.py, tests/test_consume.py

Full disclosure, I had Claude Code implement those three test modules, and then I iterated a bit. If it's still excessive / not testing usefully -- something Claude is known to do, sigh -- just let me know, and I will take a hatchet to it.

This is a very belated follow-up to PR #667 and Issue #587.

constraints

  • Tap.sync_all is @typing.final, so dispatch can't be restructured at the SDK boundary -- this was a bummer. My next best option was trigger at the first LOG_BASED stream's get_records() call, gated by a _shared_wal_run_completed flag on the tap so siblings become no-ops.
  • SCHEMA-before-RECORD across streams: _sync_log_based_streams_shared pre-writes every stream's schema before the reader runs. Since the SDK's Stream.sync() later calls _write_schema_message() again, the override on PostgresLogBasedStream is idempotent. (Without that flag every SCHEMA would be emitted twice, which is not great.)
  • Per-stream LSN filter: Replication opens at min(start_lsn) across all streams, so each stream's own bookmark is captured at construction and used to drop messages that it's already past.
  • I had to dip into private SDK calls -- _write_record_message() and _increment_stream_state() -- for this to work. I consolidated them in one place -- emit_record() -- so SDK renames hit one method. Not sure how stable the API is here...

questions

  1. _write_schema_message idempotency: I made the smallest fix I could for the duplicate-SCHEMA bug given the @final constraint on sync_all. Is there another / cleaner approach?
  2. emit_record() uses internal SDK calls. Is this going to be an issue? Is there a safer / "public" equivalent?
  3. get_records() as trigger: The "first stream's get_records fires the shared reader" pattern is a not-great workaround for sync_all being final. Is it okay as documented, or is there a cleaner SDK hook?
  4. replication_max_run_seconds / replication_idle_exit_seconds now bound the whole LOG_BASED batch instead of each stream. To me that feels like an improvement, but I don't know the whole system / downstream use caess. For example, does anything downstream assume per-stream bounds?

@edgarrmondragon
Copy link
Copy Markdown
Member

Thanks @bdewilde!

There are some typing errors. I might take a longer look later in the week.

@edgarrmondragon edgarrmondragon changed the title Single-pass WAL streaming for LOG_BASED replication? feat: Single-pass WAL streaming for LOG_BASED replication May 4, 2026
@edgarrmondragon edgarrmondragon self-assigned this May 4, 2026
@edgarrmondragon edgarrmondragon added the enhancement New feature or request label May 4, 2026
@bdewilde
Copy link
Copy Markdown
Author

bdewilde commented May 4, 2026

Thanks @bdewilde!

There are some typing errors. I might take a longer look later in the week.

Hi Edgar, thanks in advance for giving it a longer look! My bad for the typing errors -- if you can point me at them (without having to do a whole review ;), I can try to fix them sometime this week. And apologies for the test failures. I wasn't ever able to get the full unit test suite running green in my local dev, but all of the new tests I added did pass for me.

@bdewilde
Copy link
Copy Markdown
Author

Hi again @edgarrmondragon 🙂 Just swinging by to check on this. Is there anything I can do to nudge this forward? I'm happy to make changes / go back to the drawing board, I just need some direction from you, since I'm not deeply familiar with the SDK.

Copy link
Copy Markdown
Member

@edgarrmondragon edgarrmondragon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bdewilde and sorry for the delay!

I started reviewing and managed to take a look at 2 of the files. I'll continue tomorrow, but in the meantime, some questions, nits and suggestions.

Comment on lines +23 to +33
This is the source of truth for matching a WAL message to a registered stream.
Both sides -- the tap (when registering streams with the reader) and the WAL reader
(when dispatching a parsed payload) -- *must* call this function with the raw schema
and table name strings.

wal2json's format-version=2 output includes ``"schema"`` and ``"table"`` fields
as the raw, unquoted identifier names (wal2json reports whatever Postgres has stored).
Therefore, use the raw names joined by a single dot, with no quoting and no case folding.

Do *not* use ``SQLStream.fully_qualified_name`` for dispatch.
"""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is rather helpful context 👍

Comment on lines +108 to +111
for key in ("columns", "identity"):
for column in payload.get(key, ()) or ():
if column.get("type") == "text[]" and column.get("value") is not None:
column["value"] = psycopg2.extensions.STRINGARRAY(column["value"], cursor)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious: only text[] are represented as {a,b,c}, but not, for example int[]?

Comment on lines +125 to +129
global_start_lsn = min(start_lsn for _, start_lsn in self._streams_by_fqn.values())
fqn_objs = [stream.fully_qualified_name for stream, _ in self._streams_by_fqn.values()]
add_tables = build_add_tables_option(
[(t.cast("str", fqn_obj.schema), fqn_obj.table) for fqn_obj in fqn_objs]
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we're looping three times over the streams. could we do this in a single loop? Maybe 2 loops if build_add_tables_option is too messy to refactor.


def _run_loop(self, cursor: extras.ReplicationCursor) -> None:
"""Inner read / dispatch / periodic-flush loop."""
run_start = datetime.datetime.now()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could use time.monotonic() instead of datetime.datetime.now() and avoid those .total_seconds() calls below?

# messages for registered tables... count it and move on; a non-zero counter
# in logs is a signal to investigate (e.g. a normalize_fqn format mismatch)
self.records_unroutable += 1
self._logger.debug("Received message for unregistered table %s; dropping", fqn)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a warning so it's visible by default? how worrying would this event be?

Comment thread tap_postgres/client.py
Comment on lines +604 to +607
raise RuntimeError(
f"A message payload of {payload!r} (corresponding to an unknown "
f"action type {action!r}) could not be processed."
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps for a future refactor: should this be a warning instead of crashing the full sync?

Comment on lines +286 to +292
try:
wal_end = getattr(cursor, "wal_end", None)
if wal_end is not None and wal_end > start_lsn:
flush_lsn = wal_end
except Exception:
pass

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread tap_postgres/client.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: review this file in full.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants