Skip to content

refactor(inkless:consume): only remote fetches to run on data executor#466

Merged
AnatolyPopov merged 11 commits intomainfrom
jeqo/only-fetch-on-work-thread
Dec 30, 2025
Merged

refactor(inkless:consume): only remote fetches to run on data executor#466
AnatolyPopov merged 11 commits intomainfrom
jeqo/only-fetch-on-work-thread

Conversation

@jeqo
Copy link
Copy Markdown
Contributor

@jeqo jeqo commented Dec 23, 2025

Refactor fetch planning by changing how fetch operations are scheduled on the data executor. It only assigns the data executor for remote fetch operations and let the calling thread to run the cache.get(), avoiding the scenario where calls to cache are blocked by all data executor threads being used.

By doing this, the CacheFetchJob component got irrelevant as most of the logic is already on the planning, even the scheduling.

Changes:

  • Delete CacheFetchJob
  • Extend ObjectCache to allow passing the executor to run the load function
  • Refactor the FetchPlanner to use the new approach
  • Update documentation on FetchPlanner and Reader to clarify how stages are ran
  • Move CacheFetchJobTest to FetchPlannerTest to keep and improve coverage

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the fetch planning architecture to optimize thread pool usage by executing only remote fetches on the data executor, while cache lookups run on the calling thread. The key changes eliminate the CacheFetchJob component and extend the ObjectCache interface to accept an executor parameter for async operations.

Key Changes:

  • Deleted CacheFetchJob and moved its logic into FetchPlanner, which now directly interacts with the cache and schedules remote fetches
  • Modified ObjectCache API to return CompletableFuture and accept an executor parameter for controlling where load operations run
  • Migrated tests from CacheFetchJobTest to FetchPlannerTest with expanded coverage for cache hit/miss scenarios

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
FetchPlannerTest.java Migrated and expanded tests from CacheFetchJobTest, adding comprehensive tests for concurrent requests, cache hits/misses, and failure scenarios
CacheFetchJobTest.java Deleted - tests migrated to FetchPlannerTest
NullCache.java Updated to implement new async computeIfAbsent API that returns CompletableFuture and accepts executor parameter
Reader.java Added clarifying comments about execution stages and which thread pools handle different operations
FetchPlanner.java Major refactor: removed CacheFetchJob dependency, integrated cache operations directly, added ObjectFetchRequest and MergedBatchRequest records for internal planning
CacheFetchJob.java Deleted - functionality merged into FetchPlanner
ObjectCache.java Changed computeIfAbsent signature to return CompletableFuture and accept an Executor parameter for controlling async load execution
MemoryCache.java Deleted - test-only implementation no longer needed
CaffeineCache.java Updated to implement new async computeIfAbsent API using Caffeine's async cache with custom executor

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from 6f06aff to d4c4bc4 Compare December 23, 2025 11:07
@jeqo jeqo requested a review from Copilot December 23, 2025 11:07
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch 2 times, most recently from c64c4aa to 31b9aa9 Compare December 23, 2025 13:48
@jeqo jeqo requested a review from Copilot December 23, 2025 13:50
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from 31b9aa9 to f6be2a5 Compare December 23, 2025 15:16
@jeqo jeqo force-pushed the jeqo/refactor-fetch-completer-schedule branch from 6cf25a1 to e862b11 Compare December 29, 2025 12:38
@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from f6be2a5 to 94ed0bf Compare December 29, 2025 13:30
@jeqo jeqo requested a review from Copilot December 29, 2025 14:53
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from 2713196 to 3285a0e Compare December 29, 2025 15:10
Base automatically changed from jeqo/refactor-fetch-completer-schedule to main December 29, 2025 15:27
@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from 3285a0e to 3dd60d4 Compare December 29, 2025 15:29
@jeqo jeqo marked this pull request as ready for review December 29, 2025 15:30
@jeqo jeqo requested a review from AnatolyPopov December 29, 2025 15:30
Comment on lines +555 to +556
// Should have only 1 future because the cache deduplicates same-key requests
assertThat(futures).hasSize(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will happen not because of the cache key but because of groupingBy with BatchInfo.objectKey. There will be only single FetchRequest created always because of that. Might be so that this is still fine because the idea is to test deduplication but the comment seems to be misleading.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updating comment.

metrics::fetchFileFinished,
metrics::cacheEntrySize
timestamp,
totalBytes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If total bytes will be used for rate limiting there will be a sum calculated further out of these values, right? But here every request will get the totalBytes and will have the same value, should it be byteRange.size() instead? Or could you clarify what am I missing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm removing the ts/bytes from this PR as it's mixing concerns from a following PR here. I've refactored the changes on the first fixup commit to simplify this.

Comment on lines +284 to +315
// Execute: Trigger fetch operations
final List<CompletableFuture<FileExtent>> futures = planner.get();

// Verify: Should have two futures
assertThat(futures).hasSize(2);

// Wait for all to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

// Verify both were fetched
verify(fetcher).fetch(eq(OBJECT_KEY_A), any(ByteRange.class));
verify(fetcher).fetch(eq(OBJECT_KEY_B), any(ByteRange.class));

// Verify correct data for each
final List<FileExtent> results = futures.stream()
.map(f -> {
try {
return f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());

assertThat(results).hasSize(2);

// Verify the actual data content matches expected values
// Note: We cannot rely on ordering since the futures complete asynchronously,
// so we use containsExactlyInAnyOrder to verify both byte arrays are present.
assertThat(results)
.extracting(FileExtent::data)
.containsExactlyInAnyOrder(dataA, dataB);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be just a one-liner plus verification?

Suggested change
// Execute: Trigger fetch operations
final List<CompletableFuture<FileExtent>> futures = planner.get();
// Verify: Should have two futures
assertThat(futures).hasSize(2);
// Wait for all to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
// Verify both were fetched
verify(fetcher).fetch(eq(OBJECT_KEY_A), any(ByteRange.class));
verify(fetcher).fetch(eq(OBJECT_KEY_B), any(ByteRange.class));
// Verify correct data for each
final List<FileExtent> results = futures.stream()
.map(f -> {
try {
return f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
assertThat(results).hasSize(2);
// Verify the actual data content matches expected values
// Note: We cannot rely on ordering since the futures complete asynchronously,
// so we use containsExactlyInAnyOrder to verify both byte arrays are present.
assertThat(results)
.extracting(FileExtent::data)
.containsExactlyInAnyOrder(dataA, dataB);
assertThat(planner.get()).map(CompletableFuture::get).map(FileExtent::data)
.containsExactlyInAnyOrder(dataA, dataB);
verify(fetcher).fetch(eq(OBJECT_KEY_A), any(ByteRange.class));
verify(fetcher).fetch(eq(OBJECT_KEY_B), any(ByteRange.class));

// Use the provided executor instead of Caffeine's default executor.
// This allows us to control which thread pool handles the fetch and blocks there,
// while Caffeine's internal threads remain unblocked, so cache operations can continue to be served.
return CompletableFuture.supplyAsync(() -> load.apply(k), loadExecutor);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the future completes exceptionally will the result be cached? AFAIU there will be no retry on the next attempt and the future will stay cached until evicted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! It looks like this is the case in Caffeine. I'm updating the logic to handle the eviction in case of failure.

import io.aiven.inkless.generated.CacheKey;
import io.aiven.inkless.generated.FileExtent;

public interface ObjectCache extends Cache<CacheKey, FileExtent>, Closeable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side question out of curiosity: why at all we are extending this Cache interface? Are we relying on it somewhere?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully sure, but it may be some historic reasons why it was using the internal API available for caching where it was first implemented as Infinispan. Could be reconsidered in a follow-up PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for answering, the reason for the question is that this leads to some races in the CaffeineCache class, where we use check-then-act-pattern, e.g. getIfPresent, check for null, then do something. In these situations the eviction can be triggered between the check and before the act and that is what we faced with in tiered storage. But because of these Kafka Cache semantics we have to implement it this way. So would be interesting to see if we can get rid of this interface eliminating the race at the same time.


final FetchPlanner planner = new FetchPlanner(
time, OBJECT_KEY_CREATOR, keyAlignmentStrategy,
caffeineCache, fetcher, dataExecutor, coordinates, metrics
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that the supposed to be concurrent requests will be submitted to single-threaded dataExecutor? What am I missing here and how they can be concurrent then?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'm clarifying this by renaming the test and updating the docs to reflect this is testing the async behavior, but using single threaded pool for deterministic results.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to the PR but a bit strange: the doc says -1 is disabled, I wonder how it becomes 180.

jeqo added 6 commits December 30, 2025 12:20
Refactor fetch planning by changing how fetch operations are scheduled
on the data executor. It only assigns the data executor for remote fetch
operations and let the calling thread to run the cache.get(), avoiding
the scenario where calls to cache are blocked by all data executor
threads being used.

By doing this, the CacheFetchJob component becomes irrelevant as most of
the logic is already on the planning, even the scheduling decision.

Changes:
- Delete CacheFetchJob
- Extend ObjectCache to allow passing the executor to run the load
function
- Refactor the FetchPlanner to use the new approach
- Update documentation on FetchPlanner and Reader to clarify how stages
are ran
- Move CacheFetchJobTest to FetchPlannerTest to keep and improve
coverage
…executor

Remove the temporal merged as this PR does not use bytes or timestamp.
Let that for the following PRs to define
…executor

Apply suggestion on simplified test
…executor

Improve error handling on cache async get
…executor

Further refactoring: Use same constructor as existing methods
@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from 39afa9d to f3a73f7 Compare December 30, 2025 10:20
@jeqo jeqo requested a review from AnatolyPopov December 30, 2025 10:20
// While Caffeine has built-in failed future cleanup, it happens asynchronously.
// Explicit invalidation ensures immediate removal for faster retry on subsequent requests.
if (throwable != null) {
cache.synchronous().invalidate(key);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should it be k instead of key? Using k I think we will guarantee that even for some reason the key will be mutated then we will properly invalidate the entry.

// Caffeine's AsyncCache.get() provides atomic cache population per key.
// When multiple threads concurrently request the same uncached key, the mapping function
// is invoked only once, and all waiting threads receive the same CompletableFuture.
// This guarantees that the load function is called exactly once per key, preventing duplicate
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Exactly once is not entirely correct now since we retry on failure

@jeqo jeqo requested a review from AnatolyPopov December 30, 2025 15:06
Copy link
Copy Markdown
Contributor

@AnatolyPopov AnatolyPopov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all the comments, @jeqo! LGTM!

@AnatolyPopov AnatolyPopov merged commit 05ff0ac into main Dec 30, 2025
4 checks passed
@AnatolyPopov AnatolyPopov deleted the jeqo/only-fetch-on-work-thread branch December 30, 2025 23:13
jeqo added a commit that referenced this pull request Dec 31, 2025
#466)

* refactor(inkless:consume): only remote fetches to run on data executor

Refactor fetch planning by changing how fetch operations are scheduled
on the data executor. It only assigns the data executor for remote fetch
operations and let the calling thread to run the cache.get(), avoiding
the scenario where calls to cache are blocked by all data executor
threads being used.

By doing this, the CacheFetchJob component becomes irrelevant as most of
the logic is already on the planning, even the scheduling decision.

Changes:
- Delete CacheFetchJob
- Extend ObjectCache to allow passing the executor to run the load
function
- Refactor the FetchPlanner to use the new approach
- Update documentation on FetchPlanner and Reader to clarify how stages
are ran
- Move CacheFetchJobTest to FetchPlannerTest to keep and improve
coverage

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Remove the temporal merged as this PR does not use bytes or timestamp.
Let that for the following PRs to define

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Apply suggestion on simplified test

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Improve error handling on cache async get

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Improve comment as suggested

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Further refactoring: Use same constructor as existing methods

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor
jeqo added a commit that referenced this pull request Jan 5, 2026
#466)

* refactor(inkless:consume): only remote fetches to run on data executor

Refactor fetch planning by changing how fetch operations are scheduled
on the data executor. It only assigns the data executor for remote fetch
operations and let the calling thread to run the cache.get(), avoiding
the scenario where calls to cache are blocked by all data executor
threads being used.

By doing this, the CacheFetchJob component becomes irrelevant as most of
the logic is already on the planning, even the scheduling decision.

Changes:
- Delete CacheFetchJob
- Extend ObjectCache to allow passing the executor to run the load
function
- Refactor the FetchPlanner to use the new approach
- Update documentation on FetchPlanner and Reader to clarify how stages
are ran
- Move CacheFetchJobTest to FetchPlannerTest to keep and improve
coverage

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Remove the temporal merged as this PR does not use bytes or timestamp.
Let that for the following PRs to define

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Apply suggestion on simplified test

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Improve error handling on cache async get

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Improve comment as suggested

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Further refactoring: Use same constructor as existing methods

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor
jeqo added a commit that referenced this pull request Jan 9, 2026
#466)

* refactor(inkless:consume): only remote fetches to run on data executor

Refactor fetch planning by changing how fetch operations are scheduled
on the data executor. It only assigns the data executor for remote fetch
operations and let the calling thread to run the cache.get(), avoiding
the scenario where calls to cache are blocked by all data executor
threads being used.

By doing this, the CacheFetchJob component becomes irrelevant as most of
the logic is already on the planning, even the scheduling decision.

Changes:
- Delete CacheFetchJob
- Extend ObjectCache to allow passing the executor to run the load
function
- Refactor the FetchPlanner to use the new approach
- Update documentation on FetchPlanner and Reader to clarify how stages
are ran
- Move CacheFetchJobTest to FetchPlannerTest to keep and improve
coverage

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Remove the temporal merged as this PR does not use bytes or timestamp.
Let that for the following PRs to define

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Apply suggestion on simplified test

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Improve error handling on cache async get

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Improve comment as suggested

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

Further refactoring: Use same constructor as existing methods

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor

* fixup! refactor(inkless:consume): only remote fetches to run on data executor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants