Skip to content

perf(produce): add async commit pipeline to reduce FileCommitWaitTime#525

Open
Mwea wants to merge 1 commit intomainfrom
tchary/async-commit-pipeline
Open

perf(produce): add async commit pipeline to reduce FileCommitWaitTime#525
Mwea wants to merge 1 commit intomainfrom
tchary/async-commit-pipeline

Conversation

@Mwea
Copy link
Copy Markdown
Contributor

@Mwea Mwea commented Mar 3, 2026

Summary

Replace blocking upload wait with CompletableFuture callbacks. The commit thread no longer blocks on uploadFuture.get() - instead, commits chain via thenCombine() to preserve submission order while allowing parallel uploads.

Key changes:

  • FileCommitter now supports async (callback-based) and sync (blocking) modes, controlled by produce.async.commit.pipeline.enabled config
  • Async mode chains commits via CompletableFuture to preserve order without blocking the single commit thread
  • Add error isolation: failed commits don't block subsequent commits (using handle() to convert failures for ordering purposes only)
  • Add proper shutdown with awaitTermination() in close()
  • Add markReadyToCommit() so FileCommitWaitTime measures only the executor queue wait, not the cumulative chain wait

Tests:

  • Add CacheStoreJobTest for async callback behavior
  • Add tests for error isolation in both async and sync modes
  • Add tests for awaitTermination shutdown behavior
  • Update FileCommitterTest to use real executors with Awaitility

Benchmark results

Metric Before (sync) After (async) Improvement
FileCommitWaitTime p99 227ms ~8ms 96.5%

Test plan

  • Unit tests pass for FileCommitterTest
  • Unit tests pass for CacheStoreJobTest
  • Integration test passes for WriterIntegrationTest
  • Manual testing with feature flag enabled in dev-env environment

@Mwea Mwea force-pushed the tchary/async-commit-pipeline branch 2 times, most recently from db78c51 to e1a8ebf Compare March 3, 2026 14:23
Mwea added a commit that referenced this pull request Mar 3, 2026
@Mwea Mwea marked this pull request as ready for review March 5, 2026 15:21
@Mwea Mwea requested review from Copilot and jeqo and removed request for jeqo March 5, 2026 15:21
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an asynchronous (callback-based) commit pipeline for diskless produce to avoid blocking the single commit thread on upload completion, improving FileCommitWaitTime while preserving commit submission order.

Changes:

  • Add async commit pipeline to FileCommitter with ordered commit chaining, plus improved shutdown behavior (awaitTermination).
  • Refactor commit/cache-store jobs to be driven by upload completion callbacks instead of blocking on Future.get().
  • Add feature flag inkless.produce.async.commit.pipeline.enabled with documentation, and update/extend unit + integration tests.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java Implements async commit chaining (ordered commits), callback-based cache store, and graceful shutdown.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitJob.java Refactors commit job to run via upload-completion callback + adds markReadyToCommit() for metrics.
storage/inkless/src/main/java/io/aiven/inkless/produce/CacheStoreJob.java Refactors cache store to run via upload-completion callback (no blocking on upload future).
storage/inkless/src/main/java/io/aiven/inkless/produce/UploadCompletionHandler.java New handler interface for CompletableFuture.handleAsync() upload completion processing.
storage/inkless/src/main/java/io/aiven/inkless/produce/UploadCompletionConsumer.java New consumer interface for CompletableFuture.whenCompleteAsync() upload completion processing.
storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java Threads async pipeline flag through to FileCommitter.
storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java Wires config flag into Writer construction.
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java Adds produce.async.commit.pipeline.enabled config definition + accessor.
docs/inkless/configs.rst Documents new async commit pipeline config option.
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java Updates tests for async behavior and adds new async ordering/error-isolation/shutdown tests.
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitJobTest.java Updates commit job tests to use callback API and adds markReadyToCommit() metric test.
storage/inkless/src/test/java/io/aiven/inkless/produce/CacheStoreJobTest.java New tests validating callback-driven cache store behavior.
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java Enables async pipeline in integration test to validate end-to-end behavior.
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java Updates property test wiring for new FileCommitter ctor signature and shutdown behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +85 to +90
// Tracks the previous commit to ensure commits happen in submission order, not upload completion order.
// Each new commit waits for both its upload AND the previous commit to complete before starting.
// Also used during shutdown to ensure all pending work completes before executors are shut down.
private CompletableFuture<?> previousCommitFuture = CompletableFuture.completedFuture(null);
// Tracks the last cache store future for shutdown coordination.
private CompletableFuture<?> lastCacheStoreFuture = CompletableFuture.completedFuture(null);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if instead of having these as separate futures, couldn't these be pipelined? This could simplify things a bit

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've chained the cache store future with the commit future using thenCombine:

previousCommitFuture = commitFuture.thenCombine(cacheStoreFuture, (commitResult, cacheResult) -> commitResult);

This eliminates the separate lastCacheStoreFuture field and simplifies close() to wait on just the single combined future instead of CompletableFuture.allOf(pendingCommit, pendingCacheStore).

@Mwea Mwea force-pushed the tchary/async-commit-pipeline branch from e2c8d2c to 0e7afc7 Compare March 10, 2026 10:59
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non of the interfaces introduced are actually used by type or reused. Both seem unnecessary.

@Mwea Mwea force-pushed the tchary/async-commit-pipeline branch 2 times, most recently from b731289 to 01f0578 Compare March 10, 2026 11:21
Copy link
Copy Markdown
Contributor

@jeqo jeqo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good but I think this is mixing a bunch of refactoring and feature work. We can clean this up moving the refactoring to a prep PR that can be quickly merged and do the feature review separately.

Comment on lines +260 to +263
// Cache store runs after upload completes (in parallel with commit).
// It's tracked in the chain to provide backpressure if cache store is slow.
final CompletableFuture<?> cacheStoreFuture = uploadFuture
.whenCompleteAsync(cacheStoreJob::accept, executorServiceCacheStore);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use thenAcceptAsync instead and only call cache job on success.
Also there's no need to keep a pointer to the cacheStoreFuture AFAIC. We can keep it as fire and forget as before.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider moving this CacheStoreJob refactoring to a separate PR? It's changes are not crucial to this change.
We can make it implement Consumer<ObjectKey> and rename storeToCache to accept.

// not the time waiting for upload completion or previous commits in the chain.
commitJob.markReadyToCommit();
})
.handleAsync(commitJob, executorServiceCommit)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, using thenAplyAsync can simplify the commitJob.

@Mwea Mwea force-pushed the tchary/async-commit-pipeline branch 2 times, most recently from ffa7d1b to cb3dd06 Compare March 13, 2026 08:05
…itTime

Replace blocking commit pattern with pipelined CompletableFuture chain:

- Upload starts immediately when file is closed
- Commits are chained via previousCommitFuture to preserve submission order
- Each commit waits for its upload AND previous commit before starting
- Cache store is fire-and-forget (thenAcceptAsync on upload success)

Key changes:
- FileCommitJob now implements Function<ObjectKey, List<CommitBatchResponse>>
  instead of BiFunction, only handling the success path
- CacheStoreJob now implements Consumer<ObjectKey>, invoked only on
  successful uploads via thenAcceptAsync
- Upload failures are detected via CompletionException unwrapping in
  handleCommitResult()
- markReadyToCommit() resets wait time measurement to exclude upload
  and previous commit wait times

This reduces FileCommitWaitTime by removing thread blocking while
waiting for uploads to complete. The commit thread now only runs
the actual commit operation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@Mwea Mwea force-pushed the tchary/async-commit-pipeline branch from cb3dd06 to 7bbcc95 Compare March 18, 2026 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants