feat: S3 object storage offloading for V3 bucket data#673
Conversation
|
rkistner
left a comment
There was a problem hiding this comment.
This looks quite promising, and I like the structure.
Some initial high-level comments:
- Currently there are various places in the code doing the same compression/decompression and serialize/deserialize logic. Should we perhaps do this in a wrapper class for ObjectStorage? E.g. a BucketDataObjectStorage that wraps ObjectStorage and does that logic?
- NodeJS now has built-in zstd support. But I haven't checked how the APIs and performance compares with
@mongodb/zstd. Since we're already using@mongodb/zstdimplicitly, that should be fine. - We do need a threshold for inlining ops directly in mongodb storage, before we can merge & release this: S3 has too much overhead for storing say individual 100-byte operations.
| await session.endSession(); | ||
| } | ||
|
|
||
| // After commit: delete old S3 objects (best-effort) |
There was a problem hiding this comment.
Not a blocker for initial testing, but it could be problematic if we leave orphaned documents in the bucket indefinitely (either from the delete request failing, or from say a process crash/restart between the commit and the delete).
Is there some way we can ensure these are cleaned up eventually? Maybe persisting a "delete queue" in mongodb, or running a periodic cleanup job (maybe part of the compact job)?
| // Track sizes: for S3 docs multiply compressed_size by 3 as a rough | ||
| // decompressed estimate to keep chunk byte tracking bounded. Without a | ||
| // multiplier, metadata shells (~200 bytes) would let thousands of | ||
| // S3-backed docs pack into a single chunk before splitting. |
There was a problem hiding this comment.
We already have the size on the mongodb document - could we use that instead of the estimate?
| this.logger.warn(`Failed to fetch/decompress S3 object ${doc.storage_ref?.path}: ${err}`); | ||
| doc.ops = []; |
There was a problem hiding this comment.
This should be a hard error - setting doc.ops = [] may result in data inconsistencies.
Add failing tests in storage_s3_writing.test.ts that exercise the MemoryObjectStorage helper and confirm the S3 write path guard condition works. Thread the objectStorage option through the storage stack (MongoBucketStorage, MongoSyncBucketStorage, MongoBucketBatch, PersistedBatch) so it is available for future implementation. Model changes: make ops optional in BucketDataDocumentV3 to support storage_ref-only documents. Add StorageRef type and loadBucketDataDocument guard for empty ops. Add S3ObjectStorage config type and object_storage config field. Add @aws-sdk/client-s3 and @mongodb-js/zstd dependencies. Update existing compacting tests to use non-null assertions on ops since it is now optional.
Implement S3 offloading in PersistedBatchV3: BSON-serialize, zstd-compress and upload bucket data chunks to objectStorage. Insert metadata shells with storage_ref in MongoDB instead of inline ops. Update Phase 2b test assertions with non-null accessors now that the write path works. Add storage_s3_reading.test.ts with 3 failing tests for the S3 read path: round-trip write/read, missing S3 object handling, and mixed inline+S3 batch reads. All 3 must fail until the read path fetches from S3.
…test Pre-fetch and decompress S3 objects for storage_ref docs during getBucketDataBatch so ops from S3-backed documents are included in bucket data responses and size tracking. Add red test for S3-aware compaction (Phase 2d): verifies that compacted_state is populated correctly, S3 objects are cleaned, MongoDB docs are replaced, and read path survives compaction. This test fails because compactSingleBucket does not yet fetch ops from S3-backed storage_ref documents.
Compaction now pre-fetches S3-backed ops before decode, uploads new S3 objects after rechunking, and cleans up old storage_refs after transaction commit. Batch size calculation accounts for storage_ref.compressed_size. S3ObjectStorage implements the ObjectStorage interface using @aws-sdk/client-s3, wired through MongoStorageProvider when config specifies object_storage.type: s3.
- Align S3 path format: write and compact both use maxOp (_id.o) suffix (minOp-maxOp-maxOp), not minOp - Scale compaction batch size by compressed_size * 3 for S3-backed docs, matching the read path multiplier - clearBucketLeading(): upload CLEAR doc and boundary survivors to S3 when objectStorage is configured, with old ref cleanup after the transaction - Fix compaction test: allow S3 path reuse when op ranges don't change after dedup
- Remove dead `compression` field from StorageRef interface and all sites
- Add comments explaining compressed_size * 3 heuristic for byte tracking
- Simplify S3 paths from ${minOp}-${maxOp}-${maxOp} to ${minOp}-${maxOp}
- Invert objectStorage guards: inline path first, S3 as else branch
- loadBucketDataDocument() now throws on undefined ops (empty arrays still ok)
- Set doc.ops = [] in S3 fetch error catch blocks for graceful skip
Exercise compaction behaviors (round-trip, dedup/superseding, multi-batch, duplicate collapse, seen map overflow) through MemoryObjectStorage instead of inline ops. All verification uses getBucketDataBatch, not raw doc.ops.
…ve size tracking and orphan cleanup - Add BucketDataObjectStorage wrapper: BSON serialize+zstd compress on store, fetch+zstd decompress+BSON deserialize on retrieve. Eliminates duplicated compress/decompress at all 3 call sites (PersistedBatchV3, MongoSyncBucketStorageV3, MongoCompactorV3). - Replace compressed_size * 3 heuristic with doc.size for chunk/batch byte tracking. The write path already stores the actual decompressed data size in the metadata shell; use it directly instead of estimating. - S3 fetch errors are now hard errors (throw), not silently skipped with doc.ops = []. A corrupt or missing S3 object means the read cannot produce correct results and should fail loudly. - Add pending_s3_deletes collection with deleteWithRetryQueue helper. Pending deletes are persisted inside the compaction transaction so a crash after commit but before S3 delete cannot orphan objects. The next compaction run retries any leftover pending deletes at the start of compactSingleBucket. - Update 'Missing S3 object' test to expect the hard error.
| const bsonBuffer = Buffer.from(bson.serialize({ ops })); | ||
| const compressedUint8 = await zstd.compress(bsonBuffer); | ||
| const compressed = Buffer.from(compressedUint8); | ||
| await this.storage.put(path, compressed); |
There was a problem hiding this comment.
I think we should include the content-type and content-encoding here:
ContentType: "application/bson",
ContentEncoding: "zstd",
This will allow us to for example changing the compression algorithm later, without causing compatibility issues with existing files.
ContentType is not strictly needed, but I think it's good practice to store anyway.
It may be good to add add an extension to the path as well, e.g. .bson.zstd. For in-memory or filesystem implementations, we can use the extension to indicate whether or not the file is compressed, instead of metadata (if we ever add compression support there).
- Remove unnecessary 'as any' cast in compactor batch byte limit check - Add two red tests in storage_s3_checksums.test.ts: start-straddle and end-straddle. Both exercise the checksum pipeline when a checkpoint falls mid-document for S3-backed ops. These tests fail because the aggregation filters on $ops which doesn't exist on S3-backed docs (storage_ref only). Test 1 (start straddle): writes ops via S3 writer, sets compacted_state with partial op_id, then calls getChecksums. Expects full checksum including both compacted and partial ops. Test 2 (end straddle): writes 50 ops spanning multiple S3 documents, calls getChecksums at a partial checkpoint midway. Expects partial count between 0 and total (straddling doc partially included).
Translates the 'end-to-end: duplicate + unique rows, checksum preserved' test from the V3 inline suite to S3-backed storage. Creates ops for the same row (A@1, A@2, A@4 superseded → MOVE → CLEAR) plus an independent row (B@3). Verifies: getChecksums returns the same value before and after compaction (exercises has_clear_op: CLEAR doc must be treated as full/replacing checksum, not partial/additive) getBucketDataBatch returns surviving PUT ops (B@3 and latest A@4)
- Start straddle test: use updateOne + upsert instead of insertOne (bucket_state may already exist from writer initialization). - CLEAR test: shift compacted_state.op_id back to 0 after compaction to force the pipeline to actually process the CLEAR doc. Without this, getChecksums reads compacted_state directly and never exercises the has_clear_op $ops reference. All 3 tests now fail red against S3-backed docs: 1. start straddle: $size(null) on missing $ops 2. end straddle: $size(null) on missing $ops 3. CLEAR checksum: doubled checksum (partial instead of full)
- Add has_clear_op top-level field to BucketDataDocumentV3. Set during write (PersistedBatchV3, serializeBucketData) and compaction (compactSingleBucket, clearBucketLeading). - Pipeline: wrap all $ops references with $ifNull to prevent $size(null) crash when ops are on S3 instead of inline. - Pipeline: use top-level has_clear_op field as primary source for CLEAR detection, with $ops-based fallback for backward compat. - Remove $group/$sort from pipeline. Group in JS so S3 straddle docs can be fixed up before aggregation. When is_fully_included is false and storage_ref exists, fetch ops from S3, filter by range, and recompute checksum_total/count_total/has_clear_op. - Pass objectStorage through MongoChecksumsV3 constructor for S3 fetch access in normalizePartialChecksumResults. - Fix delete-op test assertion: paths may be reused after compaction. Red tests progress: start-straddle and end-straddle now pass. CLEAR checksum preservation still fails (doubled value — under investigation).
- clearBucketLeading was missing S3 ops pre-fetch before calling
loadBucketDataDocument, causing a crash when reading S3-backed
MOVE documents during the CLEAR pass. Added pre-fetch with
BucketDataObjectStorage, same pattern as the MOVE pass.
- Added storage_ref to the CLEAR pass aggregation projection so
the pre-fetch can detect S3-backed docs.
- Fixed CLEAR checksum test: the S3 writer assigns unique replica
IDs per save(), so duplicate rows got different dedup keys and
were never superseded. Use the same rid('A') for all saves of
row A so the compactor produces MOVE tombstones and exercises
the CLEAR/has_clear_op path.
All 3 red tests (start straddle, end straddle, CLEAR checksum)
now pass.
Verify that ops above the compaction horizon survive untouched when compacting S3-backed docs with maxOpId < checkpoint. Writes 12 ops across 2+ S3 documents, compacts with maxOpId=6, then asserts all 12 ops are reachable via getBucketDataBatch — above-horizon ops as PUTs and below-horizon ops as surviving PUTs or MOVEs.
- Add inline_threshold_bytes to S3ObjectStorageConfig (optional, default 256). Chunks whose BSON-serialized size is below this threshold are stored inline in MongoDB instead of offloaded to S3. Avoids S3 overhead for tiny documents such as single CLEAR ops or small write batches. - Plumb threshold through: config -> MongoStorageProvider -> MongoBucketStorage -> MongoSyncBucketStorage -> MongoBucketBatchOptions -> PersistedBatchOptions. - Apply threshold at 3 write sites: PersistedBatchV3.flushBucketData (write path) MongoCompactorV3.compactSingleBucket (move rechunking) MongoCompactorV3.clearBucketLeading (CLEAR doc + boundary survivors) - Each site computes BSON size per chunk before choosing inline vs S3 path. - Test: small ops (BSON < 1024) stay inline and survive simulated S3 loss. Large ops (BSON > 256) go to S3 and fail hard on S3 loss.
Force all ops to S3 regardless of future default changes. Without this, a future increase to the inline threshold would silently move test data to inline storage, hiding S3 path regressions.
… suffix - ObjectStorage.put now accepts optional metadata (contentType, contentEncoding). BucketDataObjectStorage.store sets application/bson and zstd. - S3ObjectStorage.put passes ContentType and ContentEncoding to S3 PutObject. Allows future compression changes without compatibility issues. - All S3 key paths now use .bson.zstd suffix for extension-based format detection (e.g. MemoryObjectStorage or filesystem implementations).
- Add optional accessKeyId/secretAccessKey to S3ObjectStorage for
MinIO/local S3-compatible endpoints (not AWS IAM).
- Add storage_s3_minio.test.ts: write 3 ops, compact, read back
against a local MinIO instance. Skipped when MINIO_URL is not set.
Run with:
MINIO_URL=http://localhost:9000 pnpm exec vitest run test/src/storage_s3_minio.test.ts
Replace local s3Factory() in all 6 S3 test files with a shared createS3TestStorageSuite() helper. Set MINIO_ENDPOINT to switch all tests from MemoryObjectStorage to a real MinIO/S3 endpoint. MINIO_ENDPOINT=http://localhost:9000 pnpm test:core When unset, tests use MemoryObjectStorage as before (no change). Tests that peek at internal MemoryObjectStorage state will not work against MinIO; those are implementation-detail tests, not behavior tests.
|
Addressed
Gaps found and fixed
Tests added
Manual tests
Running MINIO with Dockerdocker rm -f minio 2>/dev/null
docker run -d --name minio -p 9000:9000 minio/minio server /data --console-address ":9001" &&
sleep 2 &&
docker run --rm --network host --entrypoint sh minio/mc -c "
mc alias set local http://localhost:9000 minioadmin minioadmin &&
mc mb local/powersync-s3-test --ignore-existing
" &&
MINIO_ENDPOINT=http://localhost:9000 pnpm --filter='./modules/module-mongodb-storage' testDeferred
|
- Add skip guards to 5 tests that access MemoryObjectStorage internal state. These verify implementation details (S3 object counts, decompression), not API behavior, so they cannot run against real S3/MinIO. Guard: if (process.env.MINIO_ENDPOINT) return; - Remove stale comments referencing 'not yet implemented', 'MUST FAIL', and 'FAILS' from Phase 2b/2c/2d red test days. All paths are now implemented and the assertions pass.
Summary
Offload
BucketDataDocumentV3.ops[]arrays to object storage (S3), keeping only a metadata shell in MongoDB. The service reads S3 objects and streams ops to clients using the existing wire protocol — no protocol changes. Object storage is optional at configuration level; when not configured, all ops remain inline in MongoDB as today.Design Decisions
No inline threshold→inline_threshold_bytes(default 256). Chunks below this go inline.S3 path format→ paths now includebucket-data/<group>/<def>/<bucket>/<minOp>-<maxOp>.bson.zstdsuffix and carry Content-Typeapplication/bson+ Content-Encodingzstdmetadata on the S3 object.BucketDataObjectStoragewrapper (one place for BSON serialization + zstd compress on store, decompress + deserialize on retrieve).→compressed_size * 3heuristicdoc.size— the write path already stores the actual decompressed data size on the metadata shell; use it directly.has_clear_optop-level flag onBucketDataDocumentV3, set during write and compaction. Lets the checksum pipeline detect CLEAR ops without filtering on embedded$ops.$ops(they're on S3). Post-aggregation JS fetches ops from S3, filters by range, and recomputes checksum/count/clear_op.pending_s3_deletes: pending deletes are persisted inside the compaction transaction. A crash after commit but before S3 delete leaves entries that the next compaction retries.doc.ops = [].MINIO_ENDPOINTto run all S3 tests against a real S3-compatible endpoint. Internal-state tests auto-skip against real S3.~/.aws/credentials). Explicitaccess_key_id/secret_access_keyfields are only needed for self-hosted S3-compatible endpoints in production, which isn't a target yet.Manual Verification
S3ObjectStorage is not exercised in CI. To manually validate against MinIO:
The
MINIO_ENDPOINTtoggle switches all S3 tests from MemoryObjectStorage to the realS3ObjectStorageclient. Default credentials (minioadmin/minioadmin) are used —forcePathStyleis already set when an endpoint is present.