Skip to content

refactor(inkless:consume): make fetch completion non-blocking#464

Merged
AnatolyPopov merged 6 commits intomainfrom
jeqo/refactor-fetch-completer-schedule
Dec 29, 2025
Merged

refactor(inkless:consume): make fetch completion non-blocking#464
AnatolyPopov merged 6 commits intomainfrom
jeqo/refactor-fetch-completer-schedule

Conversation

@jeqo
Copy link
Copy Markdown
Contributor

@jeqo jeqo commented Dec 11, 2025

Replace blocking Future.get() calls in FetchCompleter with non-blocking CompletableFuture composition. This prevents thread exhaustion when file fetches are slow.

Previously, FetchCompleter.get() blocked waiting for file data futures, potentially exhausting thread pools under high load.

Now, the completion stage chain uses:

  • thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture that completes only when ALL file fetches are done
  • thenCombine() (non-async) to invoke FetchCompleter with already-resolved data

The FetchCompleter task runs on whichever thread completes the last stage (typically a dataExecutor thread that finished the last file fetch). It no longer blocks waiting for futures - it receives pre-fetched data and only performs in-memory data assembly (grouping file extents, copying byte ranges, and constructing MemoryRecords).

Changes:

  • FetchPlanner returns List<CompletableFuture> instead of List<Future>
  • FetchCompleter accepts resolved List instead of futures
  • Reader.allOfFileExtents() waits for all futures non-blockingly
  • Add test verifying ordering is preserved when futures complete out of order
  • Simplified exception handling (removed dead code for CacheFetchException)
  • Updated documentation to accurately describe thread execution behavior

@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch from dc8508c to de2254b Compare December 11, 2025 16:27
@jeqo jeqo requested a review from Copilot December 11, 2025 22:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the fetch completion pipeline to use non-blocking CompletableFuture composition instead of blocking Future.get() calls, preventing thread exhaustion when file fetches are slow.

Key Changes:

  • Replaced Future<FileExtent> with CompletableFuture<FileExtent> throughout the fetch pipeline
  • Introduced Reader.allOfFileExtents() to wait for all file fetch futures non-blockingly
  • Changed fetch completion to receive pre-resolved file extents instead of blocking on futures

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java Added allOfFileExtents() method to compose futures non-blockingly; changed thenCombineAsync to thenCombine for FetchCompleter
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java Changed return type from List<Future> to List<CompletableFuture>; replaced executor.submit() with CompletableFuture.supplyAsync()
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchCompleter.java Changed to accept List<FileExtent> instead of List<Future>; removed blocking Future.get() calls and ExecutionException unwrapping
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java Added test to verify order preservation when futures complete out of order
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java Updated tests to work with CompletableFuture; replaced mock executor with real executor; simplified assertions
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchCompleterTest.java Updated tests to pass resolved FileExtent instances instead of Future wrappers

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch 2 times, most recently from 6be17b6 to a5e838e Compare December 12, 2025 09:47
@jeqo jeqo requested a review from Copilot December 12, 2025 09:47
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch from a5e838e to be6b6f3 Compare December 12, 2025 09:58
@jeqo jeqo marked this pull request as ready for review December 15, 2025 20:19
@jeqo jeqo requested a review from AnatolyPopov December 15, 2025 20:19
@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch from be6b6f3 to bf9600d Compare December 23, 2025 09:24
Copy link
Copy Markdown
Contributor

@AnatolyPopov AnatolyPopov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall nice improvement but added some comments about tests and exception handling with the new logic.

Also a very minor nit: the PR description is IMO partially misleading since the FetchCompleter will not run on ForkJoinPool AFAIU but will run on whichever thread that happens to trigger the completion. In this case it is most probably dataExecutor or maybe in some very rare cases it can be metadataExecutor(if all the fileExtents are cached).
From CompletableFuture javadoc:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

Comment on lines +109 to +157
// Create file extents with distinct identifiers
final ObjectKey objectKeyA = PlainObjectKey.create("prefix", "object-a");
final ObjectKey objectKeyB = PlainObjectKey.create("prefix", "object-b");
final ObjectKey objectKeyC = PlainObjectKey.create("prefix", "object-c");

final FileExtent extentA = FileFetchJob.createFileExtent(objectKeyA, new ByteRange(0, 10), ByteBuffer.allocate(10));
final FileExtent extentB = FileFetchJob.createFileExtent(objectKeyB, new ByteRange(0, 10), ByteBuffer.allocate(10));
final FileExtent extentC = FileFetchJob.createFileExtent(objectKeyC, new ByteRange(0, 10), ByteBuffer.allocate(10));

// Create futures that complete in reverse order: C (100ms), B (200ms), A (300ms)
final CompletableFuture<FileExtent> futureA = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return extentA;
});

final CompletableFuture<FileExtent> futureB = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return extentB;
});

final CompletableFuture<FileExtent> futureC = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return extentC;
});

// Create the ordered list: A, B, C
final List<CompletableFuture<FileExtent>> orderedFutures = List.of(futureA, futureB, futureC);

// Call allOfFileExtents and wait for result
final CompletableFuture<List<FileExtent>> resultFuture = Reader.allOfFileExtents(orderedFutures);
final List<FileExtent> result = resultFuture.get(5, TimeUnit.SECONDS);

// Verify result order is preserved as A, B, C (not C, B, A which was the completion order)
assertThat(result)
.hasSize(3)
.extracting(FileExtent::object)
.containsExactly(objectKeyA.value(), objectKeyB.value(), objectKeyC.value());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this whole test could be quite a bit simplified and could be a bit more deterministic without relying on Thread.sleep.

I think instead of using CompletableFuture.supplyAsync it could be just any non-completed CompletableFuture and then those futures could be completed in order we need. Something like this

        final CompletableFuture<FileExtent> futureA = new CompletableFuture<>();
        final CompletableFuture<FileExtent> futureB = new CompletableFuture<>();
        final CompletableFuture<FileExtent> futureC = new CompletableFuture<>();

        final CompletableFuture<List<FileExtent>> resultFuture = Reader.allOfFileExtents(List.of(futureA, futureB, futureC));

        // Complete in reverse order: C, B, A
        futureC.complete(createExtent("c"));
        futureB.complete(createExtent("b"));
        futureA.complete(createExtent("a"));

        // Result preserves input order [A, B, C], not completion order [C, B, A]
        assertThat(resultFuture.join())
            .extracting(FileExtent::object)
            .containsExactly("prefix/a", "prefix/b", "prefix/c");

Comment on lines +165 to +205
// Create file extents
final ObjectKey objectKey = PlainObjectKey.create("prefix", "object");
final FileExtent extent = FileFetchJob.createFileExtent(objectKey, new ByteRange(0, 10), ByteBuffer.allocate(10));

// Create a future that will complete after a delay
final AtomicBoolean futureCompleted = new AtomicBoolean(false);
final CompletableFuture<FileExtent> delayedFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // Simulate slow operation
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
futureCompleted.set(true);
return extent;
});

final List<CompletableFuture<FileExtent>> futures = List.of(delayedFuture);

// Call allOfFileExtents - this should return immediately
final long startTime = System.currentTimeMillis();
final CompletableFuture<List<FileExtent>> resultFuture = Reader.allOfFileExtents(futures);
final long callDuration = System.currentTimeMillis() - startTime;

// Verify the call returned immediately (within 100ms)
assertThat(callDuration).isLessThan(100);

// Verify the future is not yet complete
assertThat(resultFuture).isNotCompleted();
assertThat(futureCompleted).isFalse();

// Wait for the result to actually complete
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(resultFuture).isCompleted();
assertThat(futureCompleted).isTrue();
});

// Verify the result is correct
assertThat(resultFuture.join())
.hasSize(1)
.extracting(FileExtent::object)
.containsExactly(objectKey.value());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here again I think we are relying on Thread.sleep and call durations too much but we could just manually complete the futures exactly when we need. Something like this for example

        final CompletableFuture<FileExtent> incompleteFuture = new CompletableFuture<>();

        final CompletableFuture<List<FileExtent>> resultFuture = Reader.allOfFileExtents(List.of(incompleteFuture));

        // Proves non-blocking: method returned but result not ready
        assertThat(resultFuture).isNotCompleted();

        // Complete input
        final FileExtent extent = createExtent("test");
        incompleteFuture.complete(extent);

        // Verify completion and basic correctness
        assertThat(resultFuture.join()).containsExactly(extent);

Comment on lines 191 to 213
if (throwable != null) {
LOGGER.warn("Fetch failed", throwable);
for (final var entry : fetchInfos.entrySet()) {
final String topic = entry.getKey().topic();
brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark();
brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark();
}
// Check if the exception was caused by a fetch related exception and increment the relevant metric
if (throwable instanceof CompletionException) {
// Finding batches fails on the initial stage
if (throwable.getCause() instanceof FindBatchesException) {
fetchMetrics.findBatchesFailed();
} else if (throwable.getCause() instanceof FetchException) {
// but storage-related exceptions are wrapped twice as they happen within the fetch completer
final Throwable fetchException = throwable.getCause();
if (fetchException.getCause() instanceof FileFetchException) {
fetchMetrics.fileFetchFailed();
} else if (fetchException.getCause() instanceof CacheFetchException) {
fetchMetrics.cacheFetchFailed();
}
}
}
fetchMetrics.fetchFailed();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the flow changed quite a bit and this logic requires some effort to untangle. Could we have some tests confirming that all the exceptions hierarchy is still the same as before and that the metrics will be properly recorded? Or do we have any already?

It seems this comment is now misleading

// but storage-related exceptions are wrapped twice as they happen within the fetch completer

and CacheFetchException seems to be never thrown in the whole codebase.

jeqo added 6 commits December 29, 2025 14:37
Replace blocking Future.get() calls in FetchCompleter with non-blocking
CompletableFuture composition. This prevents thread exhaustion when file
fetches are slow.

Previously, FetchCompleter.get() blocked waiting for file data futures,
potentially exhausting thread pools under high load.

Now, the completion stage chain uses:
- thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture
  that completes only when ALL file fetches are done
- thenCombine() (non-async) to invoke FetchCompleter with already-resolved data

The FetchCompleter task runs on whichever thread completes the last stage
(typically a dataExecutor thread that finished the last file fetch). It no
longer blocks waiting for futures - it receives pre-fetched data and only
performs in-memory data assembly (grouping file extents, copying byte ranges,
and constructing MemoryRecords).

Changes:
- FetchPlanner returns List<CompletableFuture<FileExtent>> instead of
  List<Future<FileExtent>>
- FetchCompleter accepts resolved List<FileExtent> instead of futures
- Reader.allOfFileExtents() waits for all futures non-blockingly
- Add test verifying ordering is preserved when futures complete out of order
- Simplified exception handling (removed dead code for CacheFetchException)
- Updated documentation to accurately describe thread execution behavior
Explains how the proposed approach does not block.
Improves testing approach by separating validations on different tests,
and removing the time conditions.
Improves future tests based on Anatolii's feedback
Refactors how fetch exceptions are handled, fixing some outdated
handling and adding tests.
Clarifies how thread pipeline works for fetching, removing confusion on
calling thread and fork-join.
@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch from 6cf25a1 to e862b11 Compare December 29, 2025 12:38
@jeqo
Copy link
Copy Markdown
Contributor Author

jeqo commented Dec 29, 2025

@AnatolyPopov thanks! you're right. I have reassess the fork-join and calling thread assumptions and improve the documentation.
Also, suggestions are added on the last 3 fixup commits, PTAL.

Copy link
Copy Markdown
Contributor

@AnatolyPopov AnatolyPopov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@AnatolyPopov AnatolyPopov merged commit 3218448 into main Dec 29, 2025
7 of 8 checks passed
@AnatolyPopov AnatolyPopov deleted the jeqo/refactor-fetch-completer-schedule branch December 29, 2025 15:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

jeqo added a commit that referenced this pull request Dec 31, 2025
* refactor(inkless:consume): make fetch completion non-blocking

Replace blocking Future.get() calls in FetchCompleter with non-blocking
CompletableFuture composition. This prevents thread exhaustion when file
fetches are slow.

Previously, FetchCompleter.get() blocked waiting for file data futures,
potentially exhausting thread pools under high load.

Now, the completion stage chain uses:
- thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture
  that completes only when ALL file fetches are done
- thenCombine() (non-async) to invoke FetchCompleter with already-resolved data

The FetchCompleter task runs on whichever thread completes the last stage
(typically a dataExecutor thread that finished the last file fetch). It no
longer blocks waiting for futures - it receives pre-fetched data and only
performs in-memory data assembly (grouping file extents, copying byte ranges,
and constructing MemoryRecords).

Changes:
- FetchPlanner returns List<CompletableFuture<FileExtent>> instead of
  List<Future<FileExtent>>
- FetchCompleter accepts resolved List<FileExtent> instead of futures
- Reader.allOfFileExtents() waits for all futures non-blockingly
- Add test verifying ordering is preserved when futures complete out of order
- Simplified exception handling (removed dead code for CacheFetchException)
- Updated documentation to accurately describe thread execution behavior

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Explains how the proposed approach does not block.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Improves testing approach by separating validations on different tests,
and removing the time conditions.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Improves future tests based on Anatolii's feedback

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Refactors how fetch exceptions are handled, fixing some outdated
handling and adding tests.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Clarifies how thread pipeline works for fetching, removing confusion on
calling thread and fork-join.
jeqo added a commit that referenced this pull request Jan 5, 2026
* refactor(inkless:consume): make fetch completion non-blocking

Replace blocking Future.get() calls in FetchCompleter with non-blocking
CompletableFuture composition. This prevents thread exhaustion when file
fetches are slow.

Previously, FetchCompleter.get() blocked waiting for file data futures,
potentially exhausting thread pools under high load.

Now, the completion stage chain uses:
- thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture
  that completes only when ALL file fetches are done
- thenCombine() (non-async) to invoke FetchCompleter with already-resolved data

The FetchCompleter task runs on whichever thread completes the last stage
(typically a dataExecutor thread that finished the last file fetch). It no
longer blocks waiting for futures - it receives pre-fetched data and only
performs in-memory data assembly (grouping file extents, copying byte ranges,
and constructing MemoryRecords).

Changes:
- FetchPlanner returns List<CompletableFuture<FileExtent>> instead of
  List<Future<FileExtent>>
- FetchCompleter accepts resolved List<FileExtent> instead of futures
- Reader.allOfFileExtents() waits for all futures non-blockingly
- Add test verifying ordering is preserved when futures complete out of order
- Simplified exception handling (removed dead code for CacheFetchException)
- Updated documentation to accurately describe thread execution behavior

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Explains how the proposed approach does not block.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Improves testing approach by separating validations on different tests,
and removing the time conditions.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Improves future tests based on Anatolii's feedback

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Refactors how fetch exceptions are handled, fixing some outdated
handling and adding tests.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Clarifies how thread pipeline works for fetching, removing confusion on
calling thread and fork-join.
jeqo added a commit that referenced this pull request Jan 9, 2026
* refactor(inkless:consume): make fetch completion non-blocking

Replace blocking Future.get() calls in FetchCompleter with non-blocking
CompletableFuture composition. This prevents thread exhaustion when file
fetches are slow.

Previously, FetchCompleter.get() blocked waiting for file data futures,
potentially exhausting thread pools under high load.

Now, the completion stage chain uses:
- thenCompose(Reader::allOfFileExtents) which returns a CompletableFuture
  that completes only when ALL file fetches are done
- thenCombine() (non-async) to invoke FetchCompleter with already-resolved data

The FetchCompleter task runs on whichever thread completes the last stage
(typically a dataExecutor thread that finished the last file fetch). It no
longer blocks waiting for futures - it receives pre-fetched data and only
performs in-memory data assembly (grouping file extents, copying byte ranges,
and constructing MemoryRecords).

Changes:
- FetchPlanner returns List<CompletableFuture<FileExtent>> instead of
  List<Future<FileExtent>>
- FetchCompleter accepts resolved List<FileExtent> instead of futures
- Reader.allOfFileExtents() waits for all futures non-blockingly
- Add test verifying ordering is preserved when futures complete out of order
- Simplified exception handling (removed dead code for CacheFetchException)
- Updated documentation to accurately describe thread execution behavior

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Explains how the proposed approach does not block.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Improves testing approach by separating validations on different tests,
and removing the time conditions.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Improves future tests based on Anatolii's feedback

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Refactors how fetch exceptions are handled, fixing some outdated
handling and adding tests.

* fixup! refactor(inkless:consume): make fetch completion non-blocking

Clarifies how thread pipeline works for fetching, removing confusion on
calling thread and fork-join.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants