refactor(inkless:consume): only remote fetches to run on data executor#466
refactor(inkless:consume): only remote fetches to run on data executor#466AnatolyPopov merged 11 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the fetch planning architecture to optimize thread pool usage by executing only remote fetches on the data executor, while cache lookups run on the calling thread. The key changes eliminate the CacheFetchJob component and extend the ObjectCache interface to accept an executor parameter for async operations.
Key Changes:
- Deleted
CacheFetchJoband moved its logic intoFetchPlanner, which now directly interacts with the cache and schedules remote fetches - Modified
ObjectCacheAPI to returnCompletableFutureand accept an executor parameter for controlling where load operations run - Migrated tests from
CacheFetchJobTesttoFetchPlannerTestwith expanded coverage for cache hit/miss scenarios
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
FetchPlannerTest.java |
Migrated and expanded tests from CacheFetchJobTest, adding comprehensive tests for concurrent requests, cache hits/misses, and failure scenarios |
CacheFetchJobTest.java |
Deleted - tests migrated to FetchPlannerTest |
NullCache.java |
Updated to implement new async computeIfAbsent API that returns CompletableFuture and accepts executor parameter |
Reader.java |
Added clarifying comments about execution stages and which thread pools handle different operations |
FetchPlanner.java |
Major refactor: removed CacheFetchJob dependency, integrated cache operations directly, added ObjectFetchRequest and MergedBatchRequest records for internal planning |
CacheFetchJob.java |
Deleted - functionality merged into FetchPlanner |
ObjectCache.java |
Changed computeIfAbsent signature to return CompletableFuture and accept an Executor parameter for controlling async load execution |
MemoryCache.java |
Deleted - test-only implementation no longer needed |
CaffeineCache.java |
Updated to implement new async computeIfAbsent API using Caffeine's async cache with custom executor |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
6f06aff to
d4c4bc4
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
c64c4aa to
31b9aa9
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
31b9aa9 to
f6be2a5
Compare
6cf25a1 to
e862b11
Compare
f6be2a5 to
94ed0bf
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
2713196 to
3285a0e
Compare
3285a0e to
3dd60d4
Compare
| // Should have only 1 future because the cache deduplicates same-key requests | ||
| assertThat(futures).hasSize(1); |
There was a problem hiding this comment.
This will happen not because of the cache key but because of groupingBy with BatchInfo.objectKey. There will be only single FetchRequest created always because of that. Might be so that this is still fine because the idea is to test deduplication but the comment seems to be misleading.
There was a problem hiding this comment.
Good point, updating comment.
| metrics::fetchFileFinished, | ||
| metrics::cacheEntrySize | ||
| timestamp, | ||
| totalBytes |
There was a problem hiding this comment.
If total bytes will be used for rate limiting there will be a sum calculated further out of these values, right? But here every request will get the totalBytes and will have the same value, should it be byteRange.size() instead? Or could you clarify what am I missing?
There was a problem hiding this comment.
I'm removing the ts/bytes from this PR as it's mixing concerns from a following PR here. I've refactored the changes on the first fixup commit to simplify this.
| // Execute: Trigger fetch operations | ||
| final List<CompletableFuture<FileExtent>> futures = planner.get(); | ||
|
|
||
| // Verify: Should have two futures | ||
| assertThat(futures).hasSize(2); | ||
|
|
||
| // Wait for all to complete | ||
| CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); | ||
|
|
||
| // Verify both were fetched | ||
| verify(fetcher).fetch(eq(OBJECT_KEY_A), any(ByteRange.class)); | ||
| verify(fetcher).fetch(eq(OBJECT_KEY_B), any(ByteRange.class)); | ||
|
|
||
| // Verify correct data for each | ||
| final List<FileExtent> results = futures.stream() | ||
| .map(f -> { | ||
| try { | ||
| return f.get(); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| assertThat(results).hasSize(2); | ||
|
|
||
| // Verify the actual data content matches expected values | ||
| // Note: We cannot rely on ordering since the futures complete asynchronously, | ||
| // so we use containsExactlyInAnyOrder to verify both byte arrays are present. | ||
| assertThat(results) | ||
| .extracting(FileExtent::data) | ||
| .containsExactlyInAnyOrder(dataA, dataB); |
There was a problem hiding this comment.
Could this be just a one-liner plus verification?
| // Execute: Trigger fetch operations | |
| final List<CompletableFuture<FileExtent>> futures = planner.get(); | |
| // Verify: Should have two futures | |
| assertThat(futures).hasSize(2); | |
| // Wait for all to complete | |
| CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); | |
| // Verify both were fetched | |
| verify(fetcher).fetch(eq(OBJECT_KEY_A), any(ByteRange.class)); | |
| verify(fetcher).fetch(eq(OBJECT_KEY_B), any(ByteRange.class)); | |
| // Verify correct data for each | |
| final List<FileExtent> results = futures.stream() | |
| .map(f -> { | |
| try { | |
| return f.get(); | |
| } catch (Exception e) { | |
| throw new RuntimeException(e); | |
| } | |
| }) | |
| .collect(Collectors.toList()); | |
| assertThat(results).hasSize(2); | |
| // Verify the actual data content matches expected values | |
| // Note: We cannot rely on ordering since the futures complete asynchronously, | |
| // so we use containsExactlyInAnyOrder to verify both byte arrays are present. | |
| assertThat(results) | |
| .extracting(FileExtent::data) | |
| .containsExactlyInAnyOrder(dataA, dataB); | |
| assertThat(planner.get()).map(CompletableFuture::get).map(FileExtent::data) | |
| .containsExactlyInAnyOrder(dataA, dataB); | |
| verify(fetcher).fetch(eq(OBJECT_KEY_A), any(ByteRange.class)); | |
| verify(fetcher).fetch(eq(OBJECT_KEY_B), any(ByteRange.class)); |
| // Use the provided executor instead of Caffeine's default executor. | ||
| // This allows us to control which thread pool handles the fetch and blocks there, | ||
| // while Caffeine's internal threads remain unblocked, so cache operations can continue to be served. | ||
| return CompletableFuture.supplyAsync(() -> load.apply(k), loadExecutor); |
There was a problem hiding this comment.
If the future completes exceptionally will the result be cached? AFAIU there will be no retry on the next attempt and the future will stay cached until evicted.
There was a problem hiding this comment.
Good catch! It looks like this is the case in Caffeine. I'm updating the logic to handle the eviction in case of failure.
| import io.aiven.inkless.generated.CacheKey; | ||
| import io.aiven.inkless.generated.FileExtent; | ||
|
|
||
| public interface ObjectCache extends Cache<CacheKey, FileExtent>, Closeable { |
There was a problem hiding this comment.
Side question out of curiosity: why at all we are extending this Cache interface? Are we relying on it somewhere?
There was a problem hiding this comment.
Not fully sure, but it may be some historic reasons why it was using the internal API available for caching where it was first implemented as Infinispan. Could be reconsidered in a follow-up PR.
There was a problem hiding this comment.
Thanks for answering, the reason for the question is that this leads to some races in the CaffeineCache class, where we use check-then-act-pattern, e.g. getIfPresent, check for null, then do something. In these situations the eviction can be triggered between the check and before the act and that is what we faced with in tiered storage. But because of these Kafka Cache semantics we have to implement it this way. So would be interesting to see if we can get rid of this interface eliminating the race at the same time.
|
|
||
| final FetchPlanner planner = new FetchPlanner( | ||
| time, OBJECT_KEY_CREATOR, keyAlignmentStrategy, | ||
| caffeineCache, fetcher, dataExecutor, coordinates, metrics |
There was a problem hiding this comment.
Do I understand correctly that the supposed to be concurrent requests will be submitted to single-threaded dataExecutor? What am I missing here and how they can be concurrent then?
There was a problem hiding this comment.
Good point. I'm clarifying this by renaming the test and updating the docs to reflect this is testing the async behavior, but using single threaded pool for deterministic results.
There was a problem hiding this comment.
Unrelated to the PR but a bit strange: the doc says -1 is disabled, I wonder how it becomes 180.
Refactor fetch planning by changing how fetch operations are scheduled on the data executor. It only assigns the data executor for remote fetch operations and let the calling thread to run the cache.get(), avoiding the scenario where calls to cache are blocked by all data executor threads being used. By doing this, the CacheFetchJob component becomes irrelevant as most of the logic is already on the planning, even the scheduling decision. Changes: - Delete CacheFetchJob - Extend ObjectCache to allow passing the executor to run the load function - Refactor the FetchPlanner to use the new approach - Update documentation on FetchPlanner and Reader to clarify how stages are ran - Move CacheFetchJobTest to FetchPlannerTest to keep and improve coverage
…executor Remove the temporal merged as this PR does not use bytes or timestamp. Let that for the following PRs to define
…executor Apply suggestion on simplified test
…executor Improve error handling on cache async get
…executor Improve comment as suggested
…executor Further refactoring: Use same constructor as existing methods
39afa9d to
f3a73f7
Compare
| // While Caffeine has built-in failed future cleanup, it happens asynchronously. | ||
| // Explicit invalidation ensures immediate removal for faster retry on subsequent requests. | ||
| if (throwable != null) { | ||
| cache.synchronous().invalidate(key); |
There was a problem hiding this comment.
nit: Should it be k instead of key? Using k I think we will guarantee that even for some reason the key will be mutated then we will properly invalidate the entry.
| // Caffeine's AsyncCache.get() provides atomic cache population per key. | ||
| // When multiple threads concurrently request the same uncached key, the mapping function | ||
| // is invoked only once, and all waiting threads receive the same CompletableFuture. | ||
| // This guarantees that the load function is called exactly once per key, preventing duplicate |
There was a problem hiding this comment.
Nit: Exactly once is not entirely correct now since we retry on failure
AnatolyPopov
left a comment
There was a problem hiding this comment.
Thanks for addressing all the comments, @jeqo! LGTM!
#466) * refactor(inkless:consume): only remote fetches to run on data executor Refactor fetch planning by changing how fetch operations are scheduled on the data executor. It only assigns the data executor for remote fetch operations and let the calling thread to run the cache.get(), avoiding the scenario where calls to cache are blocked by all data executor threads being used. By doing this, the CacheFetchJob component becomes irrelevant as most of the logic is already on the planning, even the scheduling decision. Changes: - Delete CacheFetchJob - Extend ObjectCache to allow passing the executor to run the load function - Refactor the FetchPlanner to use the new approach - Update documentation on FetchPlanner and Reader to clarify how stages are ran - Move CacheFetchJobTest to FetchPlannerTest to keep and improve coverage * fixup! refactor(inkless:consume): only remote fetches to run on data executor Remove the temporal merged as this PR does not use bytes or timestamp. Let that for the following PRs to define * fixup! refactor(inkless:consume): only remote fetches to run on data executor Apply suggestion on simplified test * fixup! refactor(inkless:consume): only remote fetches to run on data executor Improve error handling on cache async get * fixup! refactor(inkless:consume): only remote fetches to run on data executor Improve comment as suggested * fixup! refactor(inkless:consume): only remote fetches to run on data executor Further refactoring: Use same constructor as existing methods * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor
#466) * refactor(inkless:consume): only remote fetches to run on data executor Refactor fetch planning by changing how fetch operations are scheduled on the data executor. It only assigns the data executor for remote fetch operations and let the calling thread to run the cache.get(), avoiding the scenario where calls to cache are blocked by all data executor threads being used. By doing this, the CacheFetchJob component becomes irrelevant as most of the logic is already on the planning, even the scheduling decision. Changes: - Delete CacheFetchJob - Extend ObjectCache to allow passing the executor to run the load function - Refactor the FetchPlanner to use the new approach - Update documentation on FetchPlanner and Reader to clarify how stages are ran - Move CacheFetchJobTest to FetchPlannerTest to keep and improve coverage * fixup! refactor(inkless:consume): only remote fetches to run on data executor Remove the temporal merged as this PR does not use bytes or timestamp. Let that for the following PRs to define * fixup! refactor(inkless:consume): only remote fetches to run on data executor Apply suggestion on simplified test * fixup! refactor(inkless:consume): only remote fetches to run on data executor Improve error handling on cache async get * fixup! refactor(inkless:consume): only remote fetches to run on data executor Improve comment as suggested * fixup! refactor(inkless:consume): only remote fetches to run on data executor Further refactoring: Use same constructor as existing methods * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor
#466) * refactor(inkless:consume): only remote fetches to run on data executor Refactor fetch planning by changing how fetch operations are scheduled on the data executor. It only assigns the data executor for remote fetch operations and let the calling thread to run the cache.get(), avoiding the scenario where calls to cache are blocked by all data executor threads being used. By doing this, the CacheFetchJob component becomes irrelevant as most of the logic is already on the planning, even the scheduling decision. Changes: - Delete CacheFetchJob - Extend ObjectCache to allow passing the executor to run the load function - Refactor the FetchPlanner to use the new approach - Update documentation on FetchPlanner and Reader to clarify how stages are ran - Move CacheFetchJobTest to FetchPlannerTest to keep and improve coverage * fixup! refactor(inkless:consume): only remote fetches to run on data executor Remove the temporal merged as this PR does not use bytes or timestamp. Let that for the following PRs to define * fixup! refactor(inkless:consume): only remote fetches to run on data executor Apply suggestion on simplified test * fixup! refactor(inkless:consume): only remote fetches to run on data executor Improve error handling on cache async get * fixup! refactor(inkless:consume): only remote fetches to run on data executor Improve comment as suggested * fixup! refactor(inkless:consume): only remote fetches to run on data executor Further refactoring: Use same constructor as existing methods * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor
Refactor fetch planning by changing how fetch operations are scheduled on the data executor. It only assigns the data executor for remote fetch operations and let the calling thread to run the cache.get(), avoiding the scenario where calls to cache are blocked by all data executor threads being used.
By doing this, the CacheFetchJob component got irrelevant as most of the logic is already on the planning, even the scheduling.
Changes: