refactor(inkless:metrics): only add topic-type tag on all topic stats#472
refactor(inkless:metrics): only add topic-type tag on all topic stats#472juha-aiven merged 4 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the metrics tagging strategy for broker topic statistics to simplify the implementation by applying the topicType tag (classic/diskless) only to all-topics aggregate metrics, not to individual topic-specific metrics.
Key changes:
- Consolidated metric methods by removing separate
bytesInRateDisklessTopicType()andbytesOutRateDisklessTopicType()methods in favor of parameterized versions - Refactored
updateBytesOut()in BrokerTopicStats to accept anisDisklessparameter instead of having a separate method for diskless topics - Modified MeterWrapper to lazily create meters with topicType tags only for all-topics stats using a ConcurrentHashMap
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| storage/src/test/java/org/apache/kafka/storage/log/metrics/BrokerTopicStatsTest.java | Adds comprehensive tests for BrokerTopicStats to verify correct routing of metrics to classic vs diskless meters based on topic type |
| storage/src/test/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetricsTest.java | Adds tests for BrokerTopicMetrics to verify that all-topics stats have separate meters for classic/diskless while topic-specific stats ignore the isDiskless flag |
| storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicStats.java | Consolidates updateBytesOut methods into a single overloaded method accepting isDiskless parameter |
| storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java | Removes topic-type-specific constants and methods, refactors MeterWrapper to dynamically create meters with topicType tags only for all-topics stats |
| storage/inkless/src/main/java/io/aiven/inkless/produce/ActiveFile.java | Updates method calls from bytesInRateDisklessTopicType() to bytesInRate(true) for diskless topics |
| core/src/main/scala/kafka/server/KafkaApis.scala | Consolidates updateBytesOut calls to use the unified method with isDiskless parameter for both fetch and share-fetch responses |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java
Outdated
Show resolved
Hide resolved
514101a to
69a8717
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java:431
- The closed flag check is incomplete for topic-specific metrics. The topic-specific path (lines 417-431) checks and creates meters without consulting the closed flag, which could lead to a race condition similar to the one described in the comments for allTopicsStats. If close() is called concurrently with meter access on a topic-specific metric, a new meter could be created after the closed flag is set but before the lock is acquired in close(), leading to a meter leak. The closed flag should also be checked in the topic-specific path before creating new meters.
// For topic-specific metrics, use the meter without topicType
Meter meter = lazyMeter;
if (meter == null) {
meterLock.lock();
try {
meter = lazyMeter;
if (meter == null) {
meter = metricsGroup.newMeter(metricType, eventType, TimeUnit.SECONDS, metricTags);
lazyMeter = meter;
}
} finally {
meterLock.unlock();
}
}
return meter;
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java
Outdated
Show resolved
Hide resolved
storage/src/test/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetricsTest.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java
Show resolved
Hide resolved
69a8717 to
c8e04b6
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java
Show resolved
Hide resolved
Instead of tagging all topic-based metrics with topic-type, remove the additional tag from there and keep it only for the allTopics metrics. Also, remove the topic-type specific metrics as the tagged allTopics metrics is enough. BrokerTopicMetrics#MeterWrapper access to meter will now through an exception if the metrics are closed. This is a behavior change but it improves correctness and should not be considered a regression.
c8e04b6 to
5a8f3ff
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Check closed flag first for both all-topics and topic-specific metrics. | ||
| // Throwing IllegalStateException prevents NPE at call sites (e.g., .mark()) | ||
| // and makes debugging easier by providing a clear error message. | ||
| if (closed) { | ||
| throw new IllegalStateException( | ||
| "Cannot access meter for metric '" + metricType + "' after BrokerTopicMetrics has been closed"); | ||
| } |
There was a problem hiding this comment.
Is this really needed?
If the case of !isAllTopicsStats, the check is there. In the case isAllTopicsStats but meter == null case it's not. But it could be easily added there.
Let's move this?
I don't see where NPE could be thrown in any code paths in this method, if moved. If the meter has been closed, IllegalStateException is raised.
Also, the the error message looks the same even if thrown later.
| // a new meter would be created and registered with metricsGroup but never removed, | ||
| // causing a metric leak. The closed flag ensures that once close() starts, no new | ||
| // meters can be created. | ||
| private volatile boolean closed = false; |
There was a problem hiding this comment.
I'm not a huge fan of adding this. Although adding another volatile (in addition to lazyMeter doesn't add that much overhead, it's something. Especially, since reading it is in the hot code path, accessed every time meter is called.
I think we could combine these two into
private class LazyMeterWrapper {
private Meter lazyMeter;
private boolean closed;
}And instead of the two fields we'd have
private volatile LazyMeterWrapper lazyMeterWrapper;This would save from one read of volatile in the hot codepath. WDYT?
There was a problem hiding this comment.
For clarity, maybe put all this in else?
1c061d4 to
ca2dda9
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| brokerTopicStats.topicStats(topicIdPartition.topic).bytesInRate().mark(records.sizeInBytes) | ||
| brokerTopicStats.allTopicsStats.bytesInRate(false).mark(records.sizeInBytes) |
There was a problem hiding this comment.
To make this consistent, bytesInRate() in both.
| private final Map<String, String> classicTags = Map.of("topicType", "classic"); | ||
| private final Map<String, String> disklessTags = Map.of("topicType", "diskless"); |
There was a problem hiding this comment.
These could even be
private static final Map<String, String> CLASSIC_TAGS = Map.of("topicType", "classic");
private static final Map<String, String> DISKLESS_TAGS = Map.of("topicType", "diskless");…#472) * refactor(inkless:metrics): only add topic-type tag on all topic stats Instead of tagging all topic-based metrics with topic-type, remove the additional tag from there and keep it only for the allTopics metrics. Also, remove the topic-type specific metrics as the tagged allTopics metrics is enough. BrokerTopicMetrics#MeterWrapper access to meter will now through an exception if the metrics are closed. This is a behavior change but it improves correctness and should not be considered a regression. * fixup! refactor(inkless:metrics): only add topic-type tag on all topic stats * fixup! refactor(inkless:metrics): only add topic-type tag on all topic stats * chore: add storage module to fmt
…#472) * refactor(inkless:metrics): only add topic-type tag on all topic stats Instead of tagging all topic-based metrics with topic-type, remove the additional tag from there and keep it only for the allTopics metrics. Also, remove the topic-type specific metrics as the tagged allTopics metrics is enough. BrokerTopicMetrics#MeterWrapper access to meter will now through an exception if the metrics are closed. This is a behavior change but it improves correctness and should not be considered a regression. * fixup! refactor(inkless:metrics): only add topic-type tag on all topic stats * fixup! refactor(inkless:metrics): only add topic-type tag on all topic stats * chore: add storage module to fmt
Instead of tagging all topic-based metrics with topic-type, remove the additional tag from there and keep it only for the allTopics metrics.
Also, remove the topic-type specific metrics as the tagged allTopics metrics is enough.
BrokerTopicMetrics#MeterWrapper access to meter will now through an exception if the metrics are closed. This is a behavior change but it improves correctness and should not be considered a regression.
Note for reviewer: assertj is not available on storage module, so keeping junit only. Also, storage module (not inkless) is license as Apache, so keeping that header.