-
Notifications
You must be signed in to change notification settings - Fork 8
feat(metadata:diskless): implement managed replicas for diskless topics #492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
02d3f40
15fe526
9a44b38
68480d4
aed14b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -162,6 +162,7 @@ static class Builder { | |
| private int defaultNumPartitions = 1; | ||
| private boolean defaultDisklessEnable = false; | ||
| private boolean isDisklessStorageSystemEnabled = false; | ||
| private boolean isDisklessManagedReplicasEnabled = false; | ||
| private boolean classicRemoteStorageForceEnabled = false; | ||
| private List<String> classicRemoteStorageForceExcludeTopicRegexes = List.of(); | ||
|
|
||
|
|
@@ -201,6 +202,11 @@ public Builder setDisklessStorageSystemEnabled(boolean isDisklessStorageSystemEn | |
| return this; | ||
| } | ||
|
|
||
| public Builder setDisklessManagedReplicasEnabled(boolean isDisklessManagedReplicasEnabled) { | ||
| this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) { | ||
| this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled; | ||
| return this; | ||
|
|
@@ -253,6 +259,7 @@ ReplicationControlManager build() { | |
| defaultNumPartitions, | ||
| defaultDisklessEnable, | ||
| isDisklessStorageSystemEnabled, | ||
| isDisklessManagedReplicasEnabled, | ||
| classicRemoteStorageForceEnabled, | ||
| classicRemoteStorageForceExcludeTopicRegexes, | ||
| maxElectionsPerImbalance, | ||
|
|
@@ -332,9 +339,21 @@ static Map<String, String> translateCreationConfigs(CreatableTopicConfigCollecti | |
| */ | ||
| private final boolean defaultDisklessEnable; | ||
|
|
||
| /** | ||
| * When true, the diskless storage system is enabled, allowing diskless topics to be created. | ||
| */ | ||
| private final boolean isDisklessStorageSystemEnabled; | ||
| private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy; | ||
|
|
||
| /** | ||
| * When true, diskless topics use managed replicas with RF = rack_count (one replica per rack). | ||
| * When false, diskless topics use legacy RF=1 behavior. | ||
| * | ||
| * <p>Phase 1 limitation: This config only affects topic creation. Add Partitions inherits | ||
| * RF from existing partitions (correct behavior - maintains consistency within the topic). | ||
| */ | ||
| private final boolean isDisklessManagedReplicasEnabled; | ||
|
|
||
| /** | ||
| * Maximum number of leader elections to perform during one partition leader balancing operation. | ||
| */ | ||
|
|
@@ -425,6 +444,7 @@ private ReplicationControlManager( | |
| int defaultNumPartitions, | ||
| boolean defaultDisklessEnable, | ||
| boolean isDisklessStorageSystemEnabled, | ||
| boolean isDisklessManagedReplicasEnabled, | ||
| boolean classicRemoteStorageForceEnabled, | ||
| List<String> classicRemoteStorageForceExcludeTopicRegexes, | ||
| int maxElectionsPerImbalance, | ||
|
|
@@ -439,6 +459,7 @@ private ReplicationControlManager( | |
| this.defaultNumPartitions = defaultNumPartitions; | ||
| this.defaultDisklessEnable = defaultDisklessEnable; | ||
| this.isDisklessStorageSystemEnabled = isDisklessStorageSystemEnabled; | ||
| this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled; | ||
| this.classicTopicRemoteStorageForcePolicy = new ClassicTopicRemoteStorageForcePolicy( | ||
| classicRemoteStorageForceEnabled, | ||
| classicRemoteStorageForceExcludeTopicRegexes | ||
|
|
@@ -803,9 +824,12 @@ private ApiError createTopic(ControllerRequestContext context, | |
| "when the diskless storage system is disabled. " + | ||
| "Please enable the diskless storage system to create diskless topics."); | ||
| } | ||
| if (Math.abs(topic.replicationFactor()) != 1) { | ||
| // Diskless RF validation: | ||
| // When managed replicas enabled: any valid RF accepted (standard Kafka validation applies later). | ||
| // When managed replicas disabled (legacy): only RF=1 or RF=-1 (resolves to 1). | ||
| if (!isDisklessManagedReplicasEnabled && Math.abs(topic.replicationFactor()) != 1) { | ||
| return new ApiError(Errors.INVALID_REPLICATION_FACTOR, | ||
| "Replication factor for diskless topics must be 1 or -1 to use the default value (1)."); | ||
| "Replication factor for diskless topics must be 1 or -1 when managed replicas are disabled."); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -820,7 +844,7 @@ private ApiError createTopic(ControllerRequestContext context, | |
| "A manual partition assignment was specified, but numPartitions " + | ||
| "was not set to -1."); | ||
| } | ||
| if (disklessEnabled) { | ||
| if (disklessEnabled && !isDisklessManagedReplicasEnabled) { | ||
| return new ApiError(INVALID_REQUEST, | ||
| "A manual partition assignment cannot be specified for diskless topics."); | ||
| } | ||
|
|
@@ -868,12 +892,17 @@ private ApiError createTopic(ControllerRequestContext context, | |
| } else { | ||
| int numPartitions = topic.numPartitions() == -1 ? | ||
| defaultNumPartitions : topic.numPartitions(); | ||
| short replicationFactor = topic.replicationFactor() == -1 ? | ||
| defaultReplicationFactor : topic.replicationFactor(); | ||
| short classicReplicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor(); | ||
| // For managed diskless: use same resolution as classic (RF=-1 → defaultReplicationFactor, else user value). | ||
| // For unmanaged diskless (legacy): always RF=1. | ||
| short disklessReplicationFactor = isDisklessManagedReplicasEnabled ? classicReplicationFactor : 1; | ||
| short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have been thinking a lot about this. Overall I agree that from a service provider point of view it's a good feature to choose replication factor based on available zones or regions automatically. On the other hand Kafka users don't really expect that the replication factor will be chosen for them and they may not use rack awareness at all. So this assumes that diskless partitions will be used on cloud. Should we implement this feature in Aiven Core instead?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As an alternative you can use manual assignment and specify the RF that you prefer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, on a third thought, we might be fine as long as the user can opt to use manual assignment. Then maybe it's enough to log that we auto-selected the replication factor and brokers.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair question. The main goal was not to hide but to keep things backward compatible with how diskless topics are currently created with RF=1 (default -1 was turned to 1), and having this mechanism to define a proper default for diskless. On a second thought, I wonder if we can remove this "magic RF" and leave the user-defined RF. I'm looking into the implications.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a couple of fixup commits to remove the calculated RF, and rely on user-provided value. It relies a bit more on how the cluster is configured, but as long as default replication factor = racks, it achieves the same result when RF=-1. I think we can go with this approach to simplify things and avoid diverging further from classic kafka. PTAL.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, it's true that without being careful you can end up with diskless topics not spread among all available AZs (like if you don't edit default.replication.factor and use RF=-1), but it's better to have maximum flexibility. |
||
| try { | ||
| TopicAssignment topicAssignment; | ||
| Predicate<Integer> brokerFilter; | ||
| if (!disklessEnabled) { | ||
| // Diskless managed-replicas uses standard rack-aware assignment | ||
| // with user-defined RF (or defaultReplicationFactor if RF=-1) | ||
| if (!disklessEnabled || isDisklessManagedReplicasEnabled) { | ||
| topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec( | ||
| 0, | ||
| numPartitions, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.