feat(metadata:diskless): add controller metrics for diskless topics#503
feat(metadata:diskless): add controller metrics for diskless topics#503viktorsomogyi merged 2 commits intomainfrom
Conversation
6ddf9d8 to
742bb4f
Compare
6fba0c9 to
1f3c7e1
Compare
742bb4f to
4de2bb4
Compare
82dbca5 to
5f41dde
Compare
18e49b9 to
20c315d
Compare
4de2bb4 to
919d6a7
Compare
20c315d to
9a44b38
Compare
919d6a7 to
9be9a4c
Compare
9be9a4c to
bc28ceb
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java:131
publishDeltanow computes diskless-ness fromnewImage/prevImageconfigs and updates the new diskless metrics on LOG_DELTA updates, but there are no unit tests covering the LOG_DELTA path for diskless topics (create/delete/partition changes). Adding a test that drivesonMetadataUpdate(..., fakeManifest(false))and asserts diskless counts change correctly would help prevent regressions in the real-time update logic.
private void publishDelta(MetadataDelta delta, MetadataImage newImage) {
// Use newImage configs to check if topic is diskless, as the metadata cache
// may not have the config yet when processing deltas for newly created topics
Function<String, Boolean> isDisklessFromImage = topicName -> {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Properties props = newImage.configs().configProperties(resource);
return Boolean.parseBoolean(props.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false"));
};
ControllerMetricsChanges changes = new ControllerMetricsChanges(isDisklessFromImage);
if (delta.clusterDelta() != null) {
for (Entry<Integer, Optional<BrokerRegistration>> entry :
delta.clusterDelta().changedBrokers().entrySet()) {
changes.handleBrokerChange(
prevImage.cluster().brokers().get(entry.getKey()),
entry.getValue().orElse(null),
metrics
);
}
}
if (delta.topicsDelta() != null) {
for (Uuid topicId : delta.topicsDelta().deletedTopicIds()) {
TopicImage prevTopic = prevImage.topics().topicsById().get(topicId);
if (prevTopic == null) {
throw new RuntimeException("Unable to find deleted topic id " + topicId +
" in previous topics image.");
}
// For deleted topics, check isDiskless from prevImage since config is already removed from newImage
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, prevTopic.name());
Properties props = prevImage.configs().configProperties(resource);
boolean isDiskless = Boolean.parseBoolean(props.getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false"));
changes.handleDeletedTopic(prevTopic, isDiskless);
}
for (Entry<Uuid, TopicDelta> entry : delta.topicsDelta().changedTopics().entrySet()) {
changes.handleTopicChange(prevImage.topics().getTopic(entry.getKey()), entry.getValue());
}
}
changes.apply(metrics);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
Show resolved
Hide resolved
...ta/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
Outdated
Show resolved
Hide resolved
e355a6d to
3bb19bc
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ta/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
Show resolved
Hide resolved
metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
Outdated
Show resolved
Hide resolved
3bb19bc to
0380c0b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Add partition-level diskless metrics to the controller: DisklessTopicCount, DisklessPartitionCount, and DisklessOfflinePartitionCount. These gauges provide operational visibility into diskless partition health. Diskless partitions are excluded from OfflinePartitions and PreferredReplicaImbalanceCount (the transformer handles availability), but included in GlobalPartitionCount. Diskless status is resolved directly from MetadataImage configs in both snapshot and delta paths.
0380c0b to
90fa55a
Compare
viktorsomogyi
left a comment
There was a problem hiding this comment.
Overall LGTM but we probably need a follow-up when the classic <-> diskless will be complete to handle that scenario too.
|
@viktorsomogyi I have added a fixup to handle the switch scenario -- seems worth having it already to support those config updates. PTAL |
…503) Add partition-level diskless metrics to the controller: DisklessTopicCount, DisklessPartitionCount, and DisklessOfflinePartitionCount. These gauges provide operational visibility into diskless partition health. Diskless partitions are excluded from OfflinePartitions and PreferredReplicaImbalanceCount (the transformer handles availability), but included in GlobalPartitionCount. Diskless status is resolved directly from MetadataImage configs in both snapshot and delta paths. (cherry picked from commit 879b72e) # Conflicts: # core/src/main/scala/kafka/server/ControllerServer.scala
…503) Add partition-level diskless metrics to the controller: DisklessTopicCount, DisklessPartitionCount, and DisklessOfflinePartitionCount. These gauges provide operational visibility into diskless partition health. Diskless partitions are excluded from OfflinePartitions and PreferredReplicaImbalanceCount (the transformer handles availability), but included in GlobalPartitionCount. Diskless status is resolved directly from MetadataImage configs in both snapshot and delta paths.
…503) Add partition-level diskless metrics to the controller: DisklessTopicCount, DisklessPartitionCount, and DisklessOfflinePartitionCount. These gauges provide operational visibility into diskless partition health. Diskless partitions are excluded from OfflinePartitions and PreferredReplicaImbalanceCount (the transformer handles availability), but included in GlobalPartitionCount. Diskless status is resolved directly from MetadataImage configs in both snapshot and delta paths. (cherry picked from commit 879b72e) # Conflicts: # core/src/main/scala/kafka/server/ControllerServer.scala
Adds controller metrics to track diskless topics, building on #492 (managed replicas).
Changes
DisklessTopicCount,DisklessPartitionCount, andDisklessOfflinePartitionCountgauges to the controllerOfflinePartitionsCountandPreferredReplicaImbalanceCount(transformer handles availability)GlobalPartitionCount— classic partitions can be derived asGlobalPartitionCount - DisklessPartitionCountNew Metrics
DisklessTopicCountDisklessPartitionCountDisklessOfflinePartitionCountTest plan
Related