perf(produce): add async commit pipeline to reduce FileCommitWaitTime#525
perf(produce): add async commit pipeline to reduce FileCommitWaitTime#525
Conversation
db78c51 to
e1a8ebf
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces an asynchronous (callback-based) commit pipeline for diskless produce to avoid blocking the single commit thread on upload completion, improving FileCommitWaitTime while preserving commit submission order.
Changes:
- Add async commit pipeline to
FileCommitterwith ordered commit chaining, plus improved shutdown behavior (awaitTermination). - Refactor commit/cache-store jobs to be driven by upload completion callbacks instead of blocking on
Future.get(). - Add feature flag
inkless.produce.async.commit.pipeline.enabledwith documentation, and update/extend unit + integration tests.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java | Implements async commit chaining (ordered commits), callback-based cache store, and graceful shutdown. |
| storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java | Refactors commit job to run via upload-completion callback + adds markReadyToCommit() for metrics. |
| storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java | Refactors cache store to run via upload-completion callback (no blocking on upload future). |
| storage/inkless/src/main/java/io/aiven/inkless/produce/UploadCompletionHandler.java | New handler interface for CompletableFuture.handleAsync() upload completion processing. |
| storage/inkless/src/main/java/io/aiven/inkless/produce/UploadCompletionConsumer.java | New consumer interface for CompletableFuture.whenCompleteAsync() upload completion processing. |
| storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java | Threads async pipeline flag through to FileCommitter. |
| storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java | Wires config flag into Writer construction. |
| storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java | Adds produce.async.commit.pipeline.enabled config definition + accessor. |
| docs/inkless/configs.rst | Documents new async commit pipeline config option. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java | Updates tests for async behavior and adds new async ordering/error-isolation/shutdown tests. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java | Updates commit job tests to use callback API and adds markReadyToCommit() metric test. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java | New tests validating callback-driven cache store behavior. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java | Enables async pipeline in integration test to validate end-to-end behavior. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java | Updates property test wiring for new FileCommitter ctor signature and shutdown behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
Outdated
Show resolved
Hide resolved
| // Tracks the previous commit to ensure commits happen in submission order, not upload completion order. | ||
| // Each new commit waits for both its upload AND the previous commit to complete before starting. | ||
| // Also used during shutdown to ensure all pending work completes before executors are shut down. | ||
| private CompletableFuture<?> previousCommitFuture = CompletableFuture.completedFuture(null); | ||
| // Tracks the last cache store future for shutdown coordination. | ||
| private CompletableFuture<?> lastCacheStoreFuture = CompletableFuture.completedFuture(null); |
There was a problem hiding this comment.
Wonder if instead of having these as separate futures, couldn't these be pipelined? This could simplify things a bit
There was a problem hiding this comment.
Done. I've chained the cache store future with the commit future using thenCombine:
previousCommitFuture = commitFuture.thenCombine(cacheStoreFuture, (commitResult, cacheResult) -> commitResult);This eliminates the separate lastCacheStoreFuture field and simplifies close() to wait on just the single combined future instead of CompletableFuture.allOf(pendingCommit, pendingCacheStore).
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Outdated
Show resolved
Hide resolved
e2c8d2c to
0e7afc7
Compare
There was a problem hiding this comment.
Non of the interfaces introduced are actually used by type or reused. Both seem unnecessary.
b731289 to
01f0578
Compare
jeqo
left a comment
There was a problem hiding this comment.
Changes look good but I think this is mixing a bunch of refactoring and feature work. We can clean this up moving the refactoring to a prep PR that can be quickly merged and do the feature review separately.
| // Cache store runs after upload completes (in parallel with commit). | ||
| // It's tracked in the chain to provide backpressure if cache store is slow. | ||
| final CompletableFuture<?> cacheStoreFuture = uploadFuture | ||
| .whenCompleteAsync(cacheStoreJob::accept, executorServiceCacheStore); |
There was a problem hiding this comment.
We can use thenAcceptAsync instead and only call cache job on success.
Also there's no need to keep a pointer to the cacheStoreFuture AFAIC. We can keep it as fire and forget as before.
There was a problem hiding this comment.
Can we consider moving this CacheStoreJob refactoring to a separate PR? It's changes are not crucial to this change.
We can make it implement Consumer<ObjectKey> and rename storeToCache to accept.
| // not the time waiting for upload completion or previous commits in the chain. | ||
| commitJob.markReadyToCommit(); | ||
| }) | ||
| .handleAsync(commitJob, executorServiceCommit) |
There was a problem hiding this comment.
Similarly, using thenAplyAsync can simplify the commitJob.
ffa7d1b to
cb3dd06
Compare
…itTime Replace blocking commit pattern with pipelined CompletableFuture chain: - Upload starts immediately when file is closed - Commits are chained via previousCommitFuture to preserve submission order - Each commit waits for its upload AND previous commit before starting - Cache store is fire-and-forget (thenAcceptAsync on upload success) Key changes: - FileCommitJob now implements Function<ObjectKey, List<CommitBatchResponse>> instead of BiFunction, only handling the success path - CacheStoreJob now implements Consumer<ObjectKey>, invoked only on successful uploads via thenAcceptAsync - Upload failures are detected via CompletionException unwrapping in handleCommitResult() - markReadyToCommit() resets wait time measurement to exclude upload and previous commit wait times This reduces FileCommitWaitTime by removing thread blocking while waiting for uploads to complete. The commit thread now only runs the actual commit operation. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
cb3dd06 to
7bbcc95
Compare
Summary
Replace blocking upload wait with CompletableFuture callbacks. The commit thread no longer blocks on
uploadFuture.get()- instead, commits chain viathenCombine()to preserve submission order while allowing parallel uploads.Key changes:
produce.async.commit.pipeline.enabledconfighandle()to convert failures for ordering purposes only)awaitTermination()inclose()markReadyToCommit()so FileCommitWaitTime measures only the executor queue wait, not the cumulative chain waitTests:
Benchmark results
Test plan