refactor(inkless:consume): make fetch completion non-blocking#464
refactor(inkless:consume): make fetch completion non-blocking#464AnatolyPopov merged 6 commits intomainfrom
Conversation
dc8508c to
de2254b
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors the fetch completion pipeline to use non-blocking CompletableFuture composition instead of blocking Future.get() calls, preventing thread exhaustion when file fetches are slow.
Key Changes:
- Replaced
Future<FileExtent>withCompletableFuture<FileExtent>throughout the fetch pipeline - Introduced
Reader.allOfFileExtents()to wait for all file fetch futures non-blockingly - Changed fetch completion to receive pre-resolved file extents instead of blocking on futures
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java | Added allOfFileExtents() method to compose futures non-blockingly; changed thenCombineAsync to thenCombine for FetchCompleter |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java | Changed return type from List<Future> to List<CompletableFuture>; replaced executor.submit() with CompletableFuture.supplyAsync() |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java | Changed to accept List<FileExtent> instead of List<Future>; removed blocking Future.get() calls and ExecutionException unwrapping |
| storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java | Added test to verify order preservation when futures complete out of order |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java | Updated tests to work with CompletableFuture; replaced mock executor with real executor; simplified assertions |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java | Updated tests to pass resolved FileExtent instances instead of Future wrappers |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java
Outdated
Show resolved
Hide resolved
6be17b6 to
a5e838e
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java
Outdated
Show resolved
Hide resolved
a5e838e to
be6b6f3
Compare
be6b6f3 to
bf9600d
Compare
AnatolyPopov
left a comment
There was a problem hiding this comment.
Overall nice improvement but added some comments about tests and exception handling with the new logic.
Also a very minor nit: the PR description is IMO partially misleading since the FetchCompleter will not run on ForkJoinPool AFAIU but will run on whichever thread that happens to trigger the completion. In this case it is most probably dataExecutor or maybe in some very rare cases it can be metadataExecutor(if all the fileExtents are cached).
From CompletableFuture javadoc:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
| // Create file extents with distinct identifiers | ||
| final ObjectKey objectKeyA = PlainObjectKey.create("prefix", "object-a"); | ||
| final ObjectKey objectKeyB = PlainObjectKey.create("prefix", "object-b"); | ||
| final ObjectKey objectKeyC = PlainObjectKey.create("prefix", "object-c"); | ||
|
|
||
| final FileExtent extentA = FileFetchJob.createFileExtent(objectKeyA, new ByteRange(0, 10), ByteBuffer.allocate(10)); | ||
| final FileExtent extentB = FileFetchJob.createFileExtent(objectKeyB, new ByteRange(0, 10), ByteBuffer.allocate(10)); | ||
| final FileExtent extentC = FileFetchJob.createFileExtent(objectKeyC, new ByteRange(0, 10), ByteBuffer.allocate(10)); | ||
|
|
||
| // Create futures that complete in reverse order: C (100ms), B (200ms), A (300ms) | ||
| final CompletableFuture<FileExtent> futureA = CompletableFuture.supplyAsync(() -> { | ||
| try { | ||
| Thread.sleep(300); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| return extentA; | ||
| }); | ||
|
|
||
| final CompletableFuture<FileExtent> futureB = CompletableFuture.supplyAsync(() -> { | ||
| try { | ||
| Thread.sleep(200); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| return extentB; | ||
| }); | ||
|
|
||
| final CompletableFuture<FileExtent> futureC = CompletableFuture.supplyAsync(() -> { | ||
| try { | ||
| Thread.sleep(100); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| return extentC; | ||
| }); | ||
|
|
||
| // Create the ordered list: A, B, C | ||
| final List<CompletableFuture<FileExtent>> orderedFutures = List.of(futureA, futureB, futureC); | ||
|
|
||
| // Call allOfFileExtents and wait for result | ||
| final CompletableFuture<List<FileExtent>> resultFuture = Reader.allOfFileExtents(orderedFutures); | ||
| final List<FileExtent> result = resultFuture.get(5, TimeUnit.SECONDS); | ||
|
|
||
| // Verify result order is preserved as A, B, C (not C, B, A which was the completion order) | ||
| assertThat(result) | ||
| .hasSize(3) | ||
| .extracting(FileExtent::object) | ||
| .containsExactly(objectKeyA.value(), objectKeyB.value(), objectKeyC.value()); |
There was a problem hiding this comment.
I believe this whole test could be quite a bit simplified and could be a bit more deterministic without relying on Thread.sleep.
I think instead of using CompletableFuture.supplyAsync it could be just any non-completed CompletableFuture and then those futures could be completed in order we need. Something like this
final CompletableFuture<FileExtent> futureA = new CompletableFuture<>();
final CompletableFuture<FileExtent> futureB = new CompletableFuture<>();
final CompletableFuture<FileExtent> futureC = new CompletableFuture<>();
final CompletableFuture<List<FileExtent>> resultFuture = Reader.allOfFileExtents(List.of(futureA, futureB, futureC));
// Complete in reverse order: C, B, A
futureC.complete(createExtent("c"));
futureB.complete(createExtent("b"));
futureA.complete(createExtent("a"));
// Result preserves input order [A, B, C], not completion order [C, B, A]
assertThat(resultFuture.join())
.extracting(FileExtent::object)
.containsExactly("prefix/a", "prefix/b", "prefix/c");
| // Create file extents | ||
| final ObjectKey objectKey = PlainObjectKey.create("prefix", "object"); | ||
| final FileExtent extent = FileFetchJob.createFileExtent(objectKey, new ByteRange(0, 10), ByteBuffer.allocate(10)); | ||
|
|
||
| // Create a future that will complete after a delay | ||
| final AtomicBoolean futureCompleted = new AtomicBoolean(false); | ||
| final CompletableFuture<FileExtent> delayedFuture = CompletableFuture.supplyAsync(() -> { | ||
| try { | ||
| Thread.sleep(1000); // Simulate slow operation | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| futureCompleted.set(true); | ||
| return extent; | ||
| }); | ||
|
|
||
| final List<CompletableFuture<FileExtent>> futures = List.of(delayedFuture); | ||
|
|
||
| // Call allOfFileExtents - this should return immediately | ||
| final long startTime = System.currentTimeMillis(); | ||
| final CompletableFuture<List<FileExtent>> resultFuture = Reader.allOfFileExtents(futures); | ||
| final long callDuration = System.currentTimeMillis() - startTime; | ||
|
|
||
| // Verify the call returned immediately (within 100ms) | ||
| assertThat(callDuration).isLessThan(100); | ||
|
|
||
| // Verify the future is not yet complete | ||
| assertThat(resultFuture).isNotCompleted(); | ||
| assertThat(futureCompleted).isFalse(); | ||
|
|
||
| // Wait for the result to actually complete | ||
| await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { | ||
| assertThat(resultFuture).isCompleted(); | ||
| assertThat(futureCompleted).isTrue(); | ||
| }); | ||
|
|
||
| // Verify the result is correct | ||
| assertThat(resultFuture.join()) | ||
| .hasSize(1) | ||
| .extracting(FileExtent::object) | ||
| .containsExactly(objectKey.value()); |
There was a problem hiding this comment.
Here again I think we are relying on Thread.sleep and call durations too much but we could just manually complete the futures exactly when we need. Something like this for example
final CompletableFuture<FileExtent> incompleteFuture = new CompletableFuture<>();
final CompletableFuture<List<FileExtent>> resultFuture = Reader.allOfFileExtents(List.of(incompleteFuture));
// Proves non-blocking: method returned but result not ready
assertThat(resultFuture).isNotCompleted();
// Complete input
final FileExtent extent = createExtent("test");
incompleteFuture.complete(extent);
// Verify completion and basic correctness
assertThat(resultFuture.join()).containsExactly(extent);
| if (throwable != null) { | ||
| LOGGER.warn("Fetch failed", throwable); | ||
| for (final var entry : fetchInfos.entrySet()) { | ||
| final String topic = entry.getKey().topic(); | ||
| brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); | ||
| brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); | ||
| } | ||
| // Check if the exception was caused by a fetch related exception and increment the relevant metric | ||
| if (throwable instanceof CompletionException) { | ||
| // Finding batches fails on the initial stage | ||
| if (throwable.getCause() instanceof FindBatchesException) { | ||
| fetchMetrics.findBatchesFailed(); | ||
| } else if (throwable.getCause() instanceof FetchException) { | ||
| // but storage-related exceptions are wrapped twice as they happen within the fetch completer | ||
| final Throwable fetchException = throwable.getCause(); | ||
| if (fetchException.getCause() instanceof FileFetchException) { | ||
| fetchMetrics.fileFetchFailed(); | ||
| } else if (fetchException.getCause() instanceof CacheFetchException) { | ||
| fetchMetrics.cacheFetchFailed(); | ||
| } | ||
| } | ||
| } | ||
| fetchMetrics.fetchFailed(); |
There was a problem hiding this comment.
I see the flow changed quite a bit and this logic requires some effort to untangle. Could we have some tests confirming that all the exceptions hierarchy is still the same as before and that the metrics will be properly recorded? Or do we have any already?
It seems this comment is now misleading
// but storage-related exceptions are wrapped twice as they happen within the fetch completer
and CacheFetchException seems to be never thrown in the whole codebase.
Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow. Previously, FetchCompleter.get() blocked waiting for file data futures, potentially exhausting thread pools under high load. Now, the completion stage chain uses: - thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done - thenCombine() (non-async) to invoke FetchCompleter with already-resolved data The FetchCompleter task runs on whichever thread completes the last stage (typically a dataExecutor thread that finished the last file fetch). It no longer blocks waiting for futures - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords). Changes: - FetchPlanner returns List<CompletableFuture<FileExtent>> instead of List<Future<FileExtent>> - FetchCompleter accepts resolved List<FileExtent> instead of futures - Reader.allOfFileExtents() waits for all futures non-blockingly - Add test verifying ordering is preserved when futures complete out of order - Simplified exception handling (removed dead code for CacheFetchException) - Updated documentation to accurately describe thread execution behavior
Explains how the proposed approach does not block.
Improves testing approach by separating validations on different tests, and removing the time conditions.
Improves future tests based on Anatolii's feedback
Refactors how fetch exceptions are handled, fixing some outdated handling and adding tests.
Clarifies how thread pipeline works for fetching, removing confusion on calling thread and fork-join.
6cf25a1 to
e862b11
Compare
|
@AnatolyPopov thanks! you're right. I have reassess the fork-join and calling thread assumptions and improve the documentation. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
* refactor(inkless:consume): make fetch completion non-blocking Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow. Previously, FetchCompleter.get() blocked waiting for file data futures, potentially exhausting thread pools under high load. Now, the completion stage chain uses: - thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done - thenCombine() (non-async) to invoke FetchCompleter with already-resolved data The FetchCompleter task runs on whichever thread completes the last stage (typically a dataExecutor thread that finished the last file fetch). It no longer blocks waiting for futures - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords). Changes: - FetchPlanner returns List<CompletableFuture<FileExtent>> instead of List<Future<FileExtent>> - FetchCompleter accepts resolved List<FileExtent> instead of futures - Reader.allOfFileExtents() waits for all futures non-blockingly - Add test verifying ordering is preserved when futures complete out of order - Simplified exception handling (removed dead code for CacheFetchException) - Updated documentation to accurately describe thread execution behavior * fixup! refactor(inkless:consume): make fetch completion non-blocking Explains how the proposed approach does not block. * fixup! refactor(inkless:consume): make fetch completion non-blocking Improves testing approach by separating validations on different tests, and removing the time conditions. * fixup! refactor(inkless:consume): make fetch completion non-blocking Improves future tests based on Anatolii's feedback * fixup! refactor(inkless:consume): make fetch completion non-blocking Refactors how fetch exceptions are handled, fixing some outdated handling and adding tests. * fixup! refactor(inkless:consume): make fetch completion non-blocking Clarifies how thread pipeline works for fetching, removing confusion on calling thread and fork-join.
* refactor(inkless:consume): make fetch completion non-blocking Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow. Previously, FetchCompleter.get() blocked waiting for file data futures, potentially exhausting thread pools under high load. Now, the completion stage chain uses: - thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done - thenCombine() (non-async) to invoke FetchCompleter with already-resolved data The FetchCompleter task runs on whichever thread completes the last stage (typically a dataExecutor thread that finished the last file fetch). It no longer blocks waiting for futures - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords). Changes: - FetchPlanner returns List<CompletableFuture<FileExtent>> instead of List<Future<FileExtent>> - FetchCompleter accepts resolved List<FileExtent> instead of futures - Reader.allOfFileExtents() waits for all futures non-blockingly - Add test verifying ordering is preserved when futures complete out of order - Simplified exception handling (removed dead code for CacheFetchException) - Updated documentation to accurately describe thread execution behavior * fixup! refactor(inkless:consume): make fetch completion non-blocking Explains how the proposed approach does not block. * fixup! refactor(inkless:consume): make fetch completion non-blocking Improves testing approach by separating validations on different tests, and removing the time conditions. * fixup! refactor(inkless:consume): make fetch completion non-blocking Improves future tests based on Anatolii's feedback * fixup! refactor(inkless:consume): make fetch completion non-blocking Refactors how fetch exceptions are handled, fixing some outdated handling and adding tests. * fixup! refactor(inkless:consume): make fetch completion non-blocking Clarifies how thread pipeline works for fetching, removing confusion on calling thread and fork-join.
* refactor(inkless:consume): make fetch completion non-blocking Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow. Previously, FetchCompleter.get() blocked waiting for file data futures, potentially exhausting thread pools under high load. Now, the completion stage chain uses: - thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done - thenCombine() (non-async) to invoke FetchCompleter with already-resolved data The FetchCompleter task runs on whichever thread completes the last stage (typically a dataExecutor thread that finished the last file fetch). It no longer blocks waiting for futures - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords). Changes: - FetchPlanner returns List<CompletableFuture<FileExtent>> instead of List<Future<FileExtent>> - FetchCompleter accepts resolved List<FileExtent> instead of futures - Reader.allOfFileExtents() waits for all futures non-blockingly - Add test verifying ordering is preserved when futures complete out of order - Simplified exception handling (removed dead code for CacheFetchException) - Updated documentation to accurately describe thread execution behavior * fixup! refactor(inkless:consume): make fetch completion non-blocking Explains how the proposed approach does not block. * fixup! refactor(inkless:consume): make fetch completion non-blocking Improves testing approach by separating validations on different tests, and removing the time conditions. * fixup! refactor(inkless:consume): make fetch completion non-blocking Improves future tests based on Anatolii's feedback * fixup! refactor(inkless:consume): make fetch completion non-blocking Refactors how fetch exceptions are handled, fixing some outdated handling and adding tests. * fixup! refactor(inkless:consume): make fetch completion non-blocking Clarifies how thread pipeline works for fetching, removing confusion on calling thread and fork-join.
Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow.
Previously, FetchCompleter.get() blocked waiting for file data futures, potentially exhausting thread pools under high load.
Now, the completion stage chain uses:
The FetchCompleter task runs on whichever thread completes the last stage (typically a dataExecutor thread that finished the last file fetch). It no longer blocks waiting for futures - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords).
Changes: