perf(s3): zero-copy ByteBuffer optimizations for upload and fetch#524
perf(s3): zero-copy ByteBuffer optimizations for upload and fetch#524
Conversation
Add zero-copy ByteBuffer upload path for S3 to avoid the internal byte array copy that occurs in the S3 SDK's RequestBody.fromByteBuffer() via BinaryUtils.copyAllBytesFrom(). Implementation: - Add upload(ObjectKey, ByteBuffer) method to ObjectUploader interface with default implementation using ByteBufferInputStream - Override in S3Storage to use ByteBufferInputStream directly, avoiding the copy in RequestBody.fromByteBuffer() - Add createFromByteBuffer() factory method to FileUploadJob with shared retry logic via UploadOperation functional interface - Add produce.upload.zero.copy.enabled config flag (default: false) - Wire config through AppendHandler -> Writer -> FileCommitter The feature is disabled by default for backward compatibility. When enabled, FileCommitter uses ByteBuffer.wrap(file.data()) instead of creating an InputStream, allowing the S3 upload to proceed without an additional byte array copy. Benchmark results (JFR allocation profiling): | Metric | Before | After | Reduction | |---------------------------|----------|---------|-----------| | Total allocation stacks | 12,365 | 8,489 | 31.3% | | byte[] allocations | 2,190 | 1,806 | 17.5% | | S3Storage.upload stacks | 736 | 701 | 4.8% | Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add ObjectFetcher.fetchToByteBuffer() method that returns ByteBuffer directly, avoiding intermediate channel/stream copies. Before: S3 getObjectAsBytes → ByteBuffer → Channel → 1MiB chunks → consolidate After: S3 getObjectAsBytes → ByteBuffer (direct return) S3Storage overrides fetchToByteBuffer() to return the SDK's ByteBuffer directly, eliminating: - Channel wrapping overhead - 1 MiB chunk allocations - Final consolidation copy This reduces ObjectFetcher CPU from 0.98% to 0.00%. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
When a batch fits entirely within a single FileExtent (common case for sequential reads), use ByteBuffer.wrap().slice() to create a view of the existing byte array instead of allocating a new buffer and copying. Before: allocate byte[batchSize] + System.arraycopy for every batch After: ByteBuffer.wrap(data, offset, length).slice() - zero copy Falls back to the copy path for multi-extent batches (when a batch spans multiple S3 objects). This reduces FetchCompleter CPU from 1.32% to 0.39% (70% reduction). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ture flag Make the zero-copy ByteBuffer upload path the default and only code path. This removes the configuration option and always uses ByteBuffer.wrap() with asReadOnlyBuffer() for S3 uploads, avoiding an extra byte[] copy. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
63f338b to
bf5a84c
Compare
The fetchToByteBuffer() may return a direct or read-only ByteBuffer which doesn't support array(). Copy the data when needed. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
These tests verify that createFileExtent() correctly handles ByteBuffers that don't support .array() (direct and read-only buffers). Without the fix in the previous commit, these tests would throw UnsupportedOperationException. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR optimizes S3-backed storage I/O by introducing ByteBuffer-based “zero-copy” upload/fetch paths and plumbing them through the produce/consume flows to reduce allocations and byte[] copying.
Changes:
- Add ByteBuffer upload support via
ObjectUploader.upload(ObjectKey, ByteBuffer)and use it fromFileCommitter/FileUploadJob. - Add
fetchToByteBuffer()toObjectFetcher(with S3 override) and switchFileFetchJobto use it directly. - Optimize
FetchCompleterto avoid copying when a batch fits entirely within a singleFileExtent.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/test/java/io/aiven/inkless/storage_backend/common/fixtures/BaseStorageTest.java | Adds tests for ByteBuffer upload and direct ByteBuffer fetch. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java | Updates verification/stats collection for ByteBuffer-based uploads. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/FileUploadJobTest.java | Adds tests covering ByteBuffer upload retry semantics and buffer position preservation. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java | Updates tests to assert ByteBuffer upload usage and read-only buffer passing. |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FileFetchJobTest.java | Updates to fetchToByteBuffer() and adds tests for direct/read-only buffers. |
| storage/inkless/src/main/java/io/aiven/inkless/storage_backend/s3/S3Storage.java | Implements ByteBuffer upload and direct ByteBuffer fetch for S3. |
| storage/inkless/src/main/java/io/aiven/inkless/storage_backend/common/ObjectUploader.java | Adds default ByteBuffer upload overload. |
| storage/inkless/src/main/java/io/aiven/inkless/storage_backend/common/ObjectFetcher.java | Adds fetchToByteBuffer() default method (fallback to channel read). |
| storage/inkless/src/main/java/io/aiven/inkless/produce/FileUploadJob.java | Refactors upload to support ByteBuffer path and shared retry logic. |
| storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java | Switches commit uploads to ByteBuffer (read-only) path. |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FileFetchJob.java | Switches fetch to fetchToByteBuffer() and updates extent construction for non-array buffers. |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java | Adds zero-copy slice optimization when batch fits in a single extent. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
storage/inkless/src/main/java/io/aiven/inkless/storage_backend/s3/S3Storage.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FileFetchJob.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileUploadJob.java
Show resolved
Hide resolved
Code fixes from Copilot review: - Add null check for key parameter in S3Storage.fetchToByteBuffer() - Fix FileFetchJob.createFileExtent() to handle ByteBuffer position/limit/arrayOffset correctly - only use backing array when buffer spans entire array - Update FileUploadJob Javadoc to clarify when position/limit are captured Test fixes: - Add tests for buffer with non-zero position, sliced buffers, and heap buffers - Update FetchPlannerTest and ReaderTest to mock fetchToByteBuffer() directly instead of fetch() + readToByteBuffer() since FileFetchJob now calls fetchToByteBuffer() - Update verification calls from fetch() to fetchToByteBuffer() Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/storage_backend/s3/S3Storage.java
Show resolved
Hide resolved
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- WriterPropertyTest: Filter mock invocations to only upload(ObjectKey, ByteBuffer) calls to avoid counting other mock interactions - S3Storage: Extract shared doFetch() helper to eliminate duplication between fetch() and fetchToByteBuffer() methods Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java
Outdated
Show resolved
Hide resolved
The zero-copy slice optimization passed a ByteBuffer backed by cached FileExtent data directly to createMemoryRecords(), which mutates the buffer (setLastOffset, setMaxTimestamp). Since FileExtent data is shared across concurrent fetches, this introduced race conditions. Since we must copy anyway to avoid corrupting shared data, the "zero-copy" optimization provides no benefit. Remove it and always use the copy path. Also fix test ByteBuffer handling - tests need to call flip() after put() to reset position for reading. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Summary
Optimize S3 storage operations by eliminating unnecessary byte array copies through zero-copy ByteBuffer patterns.
Changes
1. Zero-copy ByteBuffer upload support
upload(ObjectKey, ByteBuffer)to ObjectUploader interfaceUploadOperationfunctional interface2. Direct ByteBuffer fetch from S3
fetchToByteBuffer()returning ByteBuffer directly from S3 SDK3. Remove feature flag
ByteBuffer.wrap(data).asReadOnlyBuffer()for uploadsNotes
The zero-copy slice optimization in FetchCompleter was removed because
createMemoryRecords()mutates the buffer (setLastOffset, setMaxTimestamp), and FileExtent data is cached/shared across concurrent fetches. Since we must copy to avoid corrupting shared data, the optimization provided no benefit.Test plan