Add Crr Cascade capabilities to backbeat crr replication#2747
Add Crr Cascade capabilities to backbeat crr replication#2747SylvainSenechal wants to merge 3 commits into
Conversation
Hello sylvainsenechal,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
|
There was a problem hiding this comment.
Can already check this pr, but should really be reviewed after all the other cascade prs, as changes in these pr would also mean changes here
There was a problem hiding this comment.
I think we can functional tests instead of just these,
But waiting for Arsenal/cloudserver to be merged, as it will be easier to make these tests (functional tests in backbeat rely on an image of cloudserver)
There was a problem hiding this comment.
keeping unit test is good, functional test should just be an addition?
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 3 files with indirect coverage changes
@@ Coverage Diff @@
## development/9.5 #2747 +/- ##
===================================================
- Coverage 74.88% 74.63% -0.26%
===================================================
Files 200 200
Lines 13670 13710 +40
===================================================
- Hits 10237 10232 -5
- Misses 3423 3468 +45
Partials 10 10
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
|
|
4c64ed6 to
3237f9e
Compare
| "@smithy/node-http-handler": "^3.3.3", | ||
| "JSONStream": "^1.3.5", | ||
| "arsenal": "git+https://github.com/scality/arsenal#8.3.9", | ||
| "arsenal": "git+https://github.com/scality/Arsenal#2c429ab35a5ac82c3dafa5a0296a49a23a9c8a4a", |
There was a problem hiding this comment.
Arsenal is pinned to a raw commit hash (2c429ab...) instead of a semantic version tag. Per project conventions, git-based deps (arsenal, vaultclient, etc.) should pin to tags (e.g. #8.x.y). Commit hashes are opaque — it's unclear which features/fixes are included, and there's no semver contract. This also makes it harder for reviewers and operators to reason about what changed.
— Claude Code
| "@aws-sdk/client-sts": "^3.921.0", | ||
| "@aws-sdk/credential-providers": "^3.921.0", | ||
| "@scality/cloudserverclient": "^1.0.8", | ||
| "@scality/cloudserverclient": "file:./scality-cloudserverclient-v1.0.9.tgz", |
There was a problem hiding this comment.
A binary .tgz is committed to the repo and referenced via file: in package.json. This permanently inflates the git history (binary blobs can't be diffed or garbage-collected), prevents code review of the dependency contents, and bypasses the normal package registry workflow. Consider publishing @scality/cloudserverclient@1.0.9 to the npm registry (or a private registry) and referencing it as a versioned dependency instead.
— Claude Code
The cascade replication logic in Review by Claude Code |
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
3237f9e to
fc09451
Compare
| method: 'ReplicateObject._getAndPutPartOnce', | ||
| entry: destEntry.getLogInfo(), | ||
| }); | ||
| return doneOnce(null, partAlreadyAtDest); |
There was a problem hiding this comment.
Source stream (incomingMsg) is not destroyed when a VersionIdCollisionException is caught. All four return paths in this handler (lines 581–636) exit early via doneOnce(...), bypassing the existing stream cleanup at lines 639–646.
For large objects, this leaks the source HTTP connection — the response body from the GET is never consumed or destroyed, so the socket stays open until it times out. Add incomingMsg.destroy() before returning in the collision handler (e.g. at the top of the if (err instanceof VersionIdCollisionException) block, or before each return doneOnce(...)).
— Claude Code
| "@smithy/node-http-handler": "^3.3.3", | ||
| "JSONStream": "^1.3.5", | ||
| "arsenal": "git+https://github.com/scality/arsenal#8.3.9", | ||
| "arsenal": "git+https://github.com/scality/Arsenal#2c429ab35a5ac82c3dafa5a0296a49a23a9c8a4a", |
There was a problem hiding this comment.
arsenal is pinned to a raw commit hash instead of a version tag. The project convention is to pin git-based deps to a tag (e.g. #8.4.4). This should be updated to the corresponding tag before merging.
— Claude Code
| "@aws-sdk/client-sts": "^3.921.0", | ||
| "@aws-sdk/credential-providers": "^3.921.0", | ||
| "@scality/cloudserverclient": "^1.0.8", | ||
| "@scality/cloudserverclient": "file:./scality-cloudserverclient-v1.0.9.tgz", |
There was a problem hiding this comment.
@scality/cloudserverclient is vendored as a local .tgz tarball checked into the repo. This bloats git history and bypasses the package registry audit trail. Should be replaced with a published registry version or a git reference before merging.
— Claude Code
|
fc09451 to
f83fbd4
Compare
| entry: destEntry.getLogInfo(), | ||
| }); | ||
| return doneOnce(null, partAlreadyAtDest); | ||
| } |
There was a problem hiding this comment.
The VersionIdCollisionException handler returns via doneOnce() without aborting the source stream or the abort controller. Compare with the generic error handler below (lines 639-646) which properly calls abortController.abort() and incomingMsg.destroy(). With the expect-continue middleware the body may not have been sent, but the source GET request and its stream are still open — this leaks a connection/fd per cascade-hit part.
| } | |
| return doneOnce(null, partAlreadyAtDest); | |
| } | |
| if (!destRequestAborted) { | |
| abortController.abort(); | |
| sourceStreamAborted = true; | |
| if (incomingMsg.destroy) { | |
| incomingMsg.destroy(); | |
| } | |
| } |
— Claude Code
| "@smithy/node-http-handler": "^3.3.3", | ||
| "JSONStream": "^1.3.5", | ||
| "arsenal": "git+https://github.com/scality/arsenal#8.3.9", | ||
| "arsenal": "git+https://github.com/scality/Arsenal#83682d1ae77705877b81414f39aacce4e27e0ceb", |
There was a problem hiding this comment.
arsenal is pinned to a raw commit hash (83682d1...) instead of a tag. The project convention (and CLAUDE.md) requires git-based deps to pin to a tag, not a branch or commit. This makes it harder to audit what version is in use and whether it's a release or an arbitrary snapshot. Please publish a tag and reference it here.
— Claude Code
| "@aws-sdk/client-sts": "^3.921.0", | ||
| "@aws-sdk/credential-providers": "^3.921.0", | ||
| "@scality/cloudserverclient": "^1.0.8", | ||
| "@scality/cloudserverclient": "file:./scality-cloudserverclient-v1.0.9.tgz", |
There was a problem hiding this comment.
Vendoring a binary .tgz in the repo is concerning: it can't be diffed/reviewed, permanently bloats git history, and bypasses registry integrity checks. Consider publishing @scality/cloudserverclient@1.0.9 to the npm registry (or a private one) and referencing it as a normal semver dependency, same as the previous ^1.0.8.
— Claude Code
|
| if (!err.MicroVersionId) { | ||
| log.info('cascade putData: data at destination, ' + | ||
| 'no microVersionId, proceeding with putMetadata', | ||
| { | ||
| method: 'ReplicateObject._getAndPutPartOnce', | ||
| entry: destEntry.getLogInfo(), | ||
| }); | ||
| return doneOnce(null, partAlreadyAtDest); | ||
| } | ||
| const destinationMicroVersionId = decode(err.MicroVersionId); | ||
| if (destinationMicroVersionId instanceof Error) { | ||
| log.error('failed to decode MicroVersionId from putData 409', { | ||
| method: 'ReplicateObject._getAndPutPartOnce', | ||
| entry: destEntry.getLogInfo(), | ||
| error: destinationMicroVersionId.message, | ||
| }); | ||
| return doneOnce(destinationMicroVersionId); | ||
| } | ||
| const sourceMicroVersionId = sourceEntry.getMicroVersionId(); | ||
| if (!sourceMicroVersionId) { | ||
| log.info('cascade putData: data at destination, ' + | ||
| 'source has no microVersionId, proceeding with putMetadata', | ||
| { | ||
| method: 'ReplicateObject._getAndPutPartOnce', | ||
| entry: destEntry.getLogInfo(), | ||
| }); | ||
| return doneOnce(null, partAlreadyAtDest); | ||
| } | ||
| const event = checkCrrCascadeEvent(sourceMicroVersionId, destinationMicroVersionId); | ||
| if (event === 'loop') { | ||
| log.info('cascade loop detected on putData: ' + | ||
| 'destination already has this exact revision, skipping', | ||
| { | ||
| method: 'ReplicateObject._getAndPutPartOnce', | ||
| entry: destEntry.getLogInfo(), | ||
| }); | ||
| return doneOnce(cascadeLoopDetected); | ||
| } | ||
| if (event === 'stale') { | ||
| log.info('cascade stale on putData: ' + | ||
| 'destination already has a newer revision', | ||
| { | ||
| method: 'ReplicateObject._getAndPutPartOnce', | ||
| entry: destEntry.getLogInfo(), | ||
| }); | ||
| return doneOnce(cascadeDataComplete); | ||
| } | ||
| // proceed: source is newer, skip data write and update metadata | ||
| log.info('cascade putData: data already at destination, ' + | ||
| 'proceeding with metadata update', | ||
| { | ||
| method: 'ReplicateObject._getAndPutPartOnce', | ||
| entry: destEntry.getLogInfo(), | ||
| }); | ||
| return doneOnce(null, partAlreadyAtDest); |
There was a problem hiding this comment.
best to introduce a dedicated function here, which can go through the different checks and just return the error if/when needed: then we have a single log line in here. easier to test, easier to read, and less duplication...
| const writeStartTime = Date.now(); | ||
| return this.backbeatDest.send(command) | ||
| .then(data => { | ||
| if (data.ReplicationLoop) { |
There was a problem hiding this comment.
did we not have a "conflict" exception in that case as well?
(i.e. does backbeat continue differently when cascadeLoopDetected vs cascadeDataComplete : does not seem to be the same in _handleReplicationOutcome, but maybe missing something)
There was a problem hiding this comment.
keeping unit test is good, functional test should just be an addition?
| if (err.ObjNotFound || err.name === 'ObjNotFound') { | ||
| return cbOnce(err); | ||
| } | ||
| if (err instanceof StaleMicroVersionIdException) { |
There was a problem hiding this comment.
@maeldonn Actually I'm rethinking this : I think from the design we said staleException on putMetada would be marked as failed.
But 🤔 Really, do we want to mark an object replication status as failed in that case ?
Issue: BB-767
Related PRs :
Arsenal : scality/Arsenal#2628
Cloudserver : scality/cloudserver#6179
CloudserverClient : scality/cloudserverclient#24
S3utils : scality/s3utils#395