feat: Single-pass WAL streaming for LOG_BASED replication#772
feat: Single-pass WAL streaming for LOG_BASED replication#772bdewilde wants to merge 22 commits into
Conversation
|
Thanks @bdewilde! There are some typing errors. I might take a longer look later in the week. |
Hi Edgar, thanks in advance for giving it a longer look! My bad for the typing errors -- if you can point me at them (without having to do a whole review ;), I can try to fix them sometime this week. And apologies for the test failures. I wasn't ever able to get the full unit test suite running green in my local dev, but all of the new tests I added did pass for me. |
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
|
Hi again @edgarrmondragon 🙂 Just swinging by to check on this. Is there anything I can do to nudge this forward? I'm happy to make changes / go back to the drawing board, I just need some direction from you, since I'm not deeply familiar with the SDK. |
edgarrmondragon
left a comment
There was a problem hiding this comment.
Thanks @bdewilde and sorry for the delay!
I started reviewing and managed to take a look at 2 of the files. I'll continue tomorrow, but in the meantime, some questions, nits and suggestions.
| This is the source of truth for matching a WAL message to a registered stream. | ||
| Both sides -- the tap (when registering streams with the reader) and the WAL reader | ||
| (when dispatching a parsed payload) -- *must* call this function with the raw schema | ||
| and table name strings. | ||
|
|
||
| wal2json's format-version=2 output includes ``"schema"`` and ``"table"`` fields | ||
| as the raw, unquoted identifier names (wal2json reports whatever Postgres has stored). | ||
| Therefore, use the raw names joined by a single dot, with no quoting and no case folding. | ||
|
|
||
| Do *not* use ``SQLStream.fully_qualified_name`` for dispatch. | ||
| """ |
There was a problem hiding this comment.
Ah, this is rather helpful context 👍
| for key in ("columns", "identity"): | ||
| for column in payload.get(key, ()) or (): | ||
| if column.get("type") == "text[]" and column.get("value") is not None: | ||
| column["value"] = psycopg2.extensions.STRINGARRAY(column["value"], cursor) |
There was a problem hiding this comment.
I'm curious: only text[] are represented as {a,b,c}, but not, for example int[]?
| global_start_lsn = min(start_lsn for _, start_lsn in self._streams_by_fqn.values()) | ||
| fqn_objs = [stream.fully_qualified_name for stream, _ in self._streams_by_fqn.values()] | ||
| add_tables = build_add_tables_option( | ||
| [(t.cast("str", fqn_obj.schema), fqn_obj.table) for fqn_obj in fqn_objs] | ||
| ) |
There was a problem hiding this comment.
nit: we're looping three times over the streams. could we do this in a single loop? Maybe 2 loops if build_add_tables_option is too messy to refactor.
|
|
||
| def _run_loop(self, cursor: extras.ReplicationCursor) -> None: | ||
| """Inner read / dispatch / periodic-flush loop.""" | ||
| run_start = datetime.datetime.now() |
There was a problem hiding this comment.
Perhaps we could use time.monotonic() instead of datetime.datetime.now() and avoid those .total_seconds() calls below?
| # messages for registered tables... count it and move on; a non-zero counter | ||
| # in logs is a signal to investigate (e.g. a normalize_fqn format mismatch) | ||
| self.records_unroutable += 1 | ||
| self._logger.debug("Received message for unregistered table %s; dropping", fqn) |
There was a problem hiding this comment.
should this be a warning so it's visible by default? how worrying would this event be?
| raise RuntimeError( | ||
| f"A message payload of {payload!r} (corresponding to an unknown " | ||
| f"action type {action!r}) could not be processed." | ||
| ) |
There was a problem hiding this comment.
Perhaps for a future refactor: should this be a warning instead of crashing the full sync?
| try: | ||
| wal_end = getattr(cursor, "wal_end", None) | ||
| if wal_end is not None and wal_end > start_lsn: | ||
| flush_lsn = wal_end | ||
| except Exception: | ||
| pass | ||
|
|
There was a problem hiding this comment.
note to self: review this file in full.
problem
PostgresLogBasedStream.get_records()opens its ownLogicalReplicationConnectionper selected stream. With N LOG_BASED streams the tap runs N sequential WAL scans -- each rereads the same segments, with add-tables discarding most records server-side. End-to-end sync time scales ~linearly in N. For pipelines with multiple LOG_BASED streams against a large backlog, this dominates run-time.changes
A new
SingleConnectionWALReaderopens one logical replication connection withadd-tablescovering all selected LOG_BASED tables, scans the WAL once, and dispatches each parsed wal2json message inline to the owning stream's newemit_record()method for immediate Singer RECORD emission. STATE flushes every 30s, and the slot is advanced to the WAL tip on idle/max-run exit._wal_helpers.py(FQN/escaping/parsing helpers) andwal_reader.py(the reader and read loop).client.pygainsemit_recordmethod and a config-flag branch inget_recordstap.pyadds thelog_based_single_connectionconfig setting (default False!) and_sync_log_based_streams_sharedorchestrationtests/test_wal_helpers.py,tests/test_wal_reader.py,tests/test_consume.pyFull disclosure, I had Claude Code implement those three test modules, and then I iterated a bit. If it's still excessive / not testing usefully -- something Claude is known to do, sigh -- just let me know, and I will take a hatchet to it.
This is a very belated follow-up to PR #667 and Issue #587.
constraints
Tap.sync_allis@typing.final, so dispatch can't be restructured at the SDK boundary -- this was a bummer. My next best option was trigger at the first LOG_BASED stream'sget_records()call, gated by a_shared_wal_run_completedflag on the tap so siblings become no-ops._sync_log_based_streams_sharedpre-writes every stream's schema before the reader runs. Since the SDK'sStream.sync()later calls_write_schema_message()again, the override onPostgresLogBasedStreamis idempotent. (Without that flag every SCHEMA would be emitted twice, which is not great.)min(start_lsn)across all streams, so each stream's own bookmark is captured at construction and used to drop messages that it's already past._write_record_message()and_increment_stream_state()-- for this to work. I consolidated them in one place --emit_record()-- so SDK renames hit one method. Not sure how stable the API is here...questions
_write_schema_messageidempotency: I made the smallest fix I could for the duplicate-SCHEMA bug given the@finalconstraint onsync_all. Is there another / cleaner approach?emit_record()uses internal SDK calls. Is this going to be an issue? Is there a safer / "public" equivalent?get_records()as trigger: The "first stream's get_records fires the shared reader" pattern is a not-great workaround forsync_allbeing final. Is it okay as documented, or is there a cleaner SDK hook?replication_max_run_seconds/replication_idle_exit_secondsnow bound the whole LOG_BASED batch instead of each stream. To me that feels like an improvement, but I don't know the whole system / downstream use caess. For example, does anything downstream assume per-stream bounds?