Skip to content

refactor(inkless:metrics): only add topic-type tag on all topic stats#472

Merged
juha-aiven merged 4 commits intomainfrom
jeqo/tag-bytes-inout-refactor
Jan 13, 2026
Merged

refactor(inkless:metrics): only add topic-type tag on all topic stats#472
juha-aiven merged 4 commits intomainfrom
jeqo/tag-bytes-inout-refactor

Conversation

@jeqo
Copy link
Copy Markdown
Contributor

@jeqo jeqo commented Jan 7, 2026

Instead of tagging all topic-based metrics with topic-type, remove the additional tag from there and keep it only for the allTopics metrics.
Also, remove the topic-type specific metrics as the tagged allTopics metrics is enough.

BrokerTopicMetrics#MeterWrapper access to meter will now through an exception if the metrics are closed. This is a behavior change but it improves correctness and should not be considered a regression.

Note for reviewer: assertj is not available on storage module, so keeping junit only. Also, storage module (not inkless) is license as Apache, so keeping that header.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the metrics tagging strategy for broker topic statistics to simplify the implementation by applying the topicType tag (classic/diskless) only to all-topics aggregate metrics, not to individual topic-specific metrics.

Key changes:

  • Consolidated metric methods by removing separate bytesInRateDisklessTopicType() and bytesOutRateDisklessTopicType() methods in favor of parameterized versions
  • Refactored updateBytesOut() in BrokerTopicStats to accept an isDiskless parameter instead of having a separate method for diskless topics
  • Modified MeterWrapper to lazily create meters with topicType tags only for all-topics stats using a ConcurrentHashMap

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
storage/src/test/java/org/apache/kafka/storage/log/metrics/BrokerTopicStatsTest.java Adds comprehensive tests for BrokerTopicStats to verify correct routing of metrics to classic vs diskless meters based on topic type
storage/src/test/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetricsTest.java Adds tests for BrokerTopicMetrics to verify that all-topics stats have separate meters for classic/diskless while topic-specific stats ignore the isDiskless flag
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicStats.java Consolidates updateBytesOut methods into a single overloaded method accepting isDiskless parameter
storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java Removes topic-type-specific constants and methods, refactors MeterWrapper to dynamically create meters with topicType tags only for all-topics stats
storage/inkless/src/main/java/io/aiven/inkless/produce/ActiveFile.java Updates method calls from bytesInRateDisklessTopicType() to bytesInRate(true) for diskless topics
core/src/main/scala/kafka/server/KafkaApis.scala Consolidates updateBytesOut calls to use the unified method with isDiskless parameter for both fetch and share-fetch responses

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/tag-bytes-inout-refactor branch from 514101a to 69a8717 Compare January 8, 2026 07:52
@jeqo jeqo requested a review from Copilot January 8, 2026 07:53
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (1)

storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java:431

  • The closed flag check is incomplete for topic-specific metrics. The topic-specific path (lines 417-431) checks and creates meters without consulting the closed flag, which could lead to a race condition similar to the one described in the comments for allTopicsStats. If close() is called concurrently with meter access on a topic-specific metric, a new meter could be created after the closed flag is set but before the lock is acquired in close(), leading to a meter leak. The closed flag should also be checked in the topic-specific path before creating new meters.
            // For topic-specific metrics, use the meter without topicType
            Meter meter = lazyMeter;
            if (meter == null) {
                meterLock.lock();
                try {
                    meter = lazyMeter;
                    if (meter == null) {
                        meter = metricsGroup.newMeter(metricType, eventType, TimeUnit.SECONDS, metricTags);
                        lazyMeter = meter;
                    }
                } finally {
                    meterLock.unlock();
                }
            }
            return meter;

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/tag-bytes-inout-refactor branch from 69a8717 to c8e04b6 Compare January 8, 2026 09:07
@jeqo jeqo requested a review from Copilot January 8, 2026 09:07
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Instead of tagging all topic-based metrics with topic-type, remove the
additional tag from there and keep it only for the allTopics metrics.
Also, remove the topic-type specific metrics as the tagged allTopics
metrics is enough.

BrokerTopicMetrics#MeterWrapper access to meter will now through an
exception if the metrics are closed. This is a behavior change but it
improves correctness and should not be considered a regression.
@jeqo jeqo force-pushed the jeqo/tag-bytes-inout-refactor branch from c8e04b6 to 5a8f3ff Compare January 8, 2026 09:23
@jeqo jeqo requested a review from Copilot January 8, 2026 09:25
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo requested a review from juha-aiven January 8, 2026 09:53
@jeqo jeqo marked this pull request as ready for review January 8, 2026 09:53
Comment on lines +459 to +465
// Check closed flag first for both all-topics and topic-specific metrics.
// Throwing IllegalStateException prevents NPE at call sites (e.g., .mark())
// and makes debugging easier by providing a clear error message.
if (closed) {
throw new IllegalStateException(
"Cannot access meter for metric '" + metricType + "' after BrokerTopicMetrics has been closed");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed?

If the case of !isAllTopicsStats, the check is there. In the case isAllTopicsStats but meter == null case it's not. But it could be easily added there.

Let's move this?

I don't see where NPE could be thrown in any code paths in this method, if moved. If the meter has been closed, IllegalStateException is raised.

Also, the the error message looks the same even if thrown later.

// a new meter would be created and registered with metricsGroup but never removed,
// causing a metric leak. The closed flag ensures that once close() starts, no new
// meters can be created.
private volatile boolean closed = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of adding this. Although adding another volatile (in addition to lazyMeter doesn't add that much overhead, it's something. Especially, since reading it is in the hot code path, accessed every time meter is called.

I think we could combine these two into

private class LazyMeterWrapper {
    private Meter lazyMeter;
    private boolean closed;
}

And instead of the two fields we'd have

private volatile LazyMeterWrapper lazyMeterWrapper;

This would save from one read of volatile in the hot codepath. WDYT?

Comment on lines 498 to 518
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, maybe put all this in else?

@jeqo jeqo requested a review from juha-aiven January 13, 2026 08:45
@jeqo jeqo force-pushed the jeqo/tag-bytes-inout-refactor branch from 1c061d4 to ca2dda9 Compare January 13, 2026 08:49
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@juha-aiven juha-aiven left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +1482 to +1483
brokerTopicStats.topicStats(topicIdPartition.topic).bytesInRate().mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesInRate(false).mark(records.sizeInBytes)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this consistent, bytesInRate() in both.

Comment on lines +66 to +67
private final Map<String, String> classicTags = Map.of("topicType", "classic");
private final Map<String, String> disklessTags = Map.of("topicType", "diskless");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could even be

private static final Map<String, String> CLASSIC_TAGS = Map.of("topicType", "classic");
private static final Map<String, String> DISKLESS_TAGS = Map.of("topicType", "diskless");

@juha-aiven juha-aiven merged commit aa045fb into main Jan 13, 2026
11 of 12 checks passed
@juha-aiven juha-aiven deleted the jeqo/tag-bytes-inout-refactor branch January 13, 2026 12:29
jeqo added a commit that referenced this pull request Jan 14, 2026
…#472)

* refactor(inkless:metrics): only add topic-type tag on all topic stats

Instead of tagging all topic-based metrics with topic-type, remove the
additional tag from there and keep it only for the allTopics metrics.
Also, remove the topic-type specific metrics as the tagged allTopics
metrics is enough.

BrokerTopicMetrics#MeterWrapper access to meter will now through an
exception if the metrics are closed. This is a behavior change but it
improves correctness and should not be considered a regression.

* fixup! refactor(inkless:metrics): only add topic-type tag on all topic stats

* fixup! refactor(inkless:metrics): only add topic-type tag on all topic stats

* chore: add storage module to fmt
jeqo added a commit that referenced this pull request Jan 14, 2026
…#472)

* refactor(inkless:metrics): only add topic-type tag on all topic stats

Instead of tagging all topic-based metrics with topic-type, remove the
additional tag from there and keep it only for the allTopics metrics.
Also, remove the topic-type specific metrics as the tagged allTopics
metrics is enough.

BrokerTopicMetrics#MeterWrapper access to meter will now through an
exception if the metrics are closed. This is a behavior change but it
improves correctness and should not be considered a regression.

* fixup! refactor(inkless:metrics): only add topic-type tag on all topic stats

* fixup! refactor(inkless:metrics): only add topic-type tag on all topic stats

* chore: add storage module to fmt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants