Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
files="MemoryRecordsBuilder.java"/>
<suppress checks="ParameterNumber"
files="ClientUtils.java"/>
<!-- ReplicationControlManager is at the edge of crossing this limit; adding it to include params for diskless logic -->
<suppress checks="ParameterNumber"
files="ReplicationControlManager.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest|ClientTelemetryReporterTest).java"/>
Expand Down Expand Up @@ -318,6 +321,9 @@
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager|ClusterControlManager|MetadataDelta|MetaPropertiesEnsemble).java"/>
<!-- ReplicationControlManager has multiple long methods; suppression needed for diskless managed replicas logic -->
<suppress checks="MethodLength"
files="ReplicationControlManager.java"/>
<suppress checks="BooleanExpressionComplexity"
files="(MetadataImage).java"/>
<suppress checks="ImportControl"
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ class ControllerServer(
setDefaultNumPartitions(config.numPartitions.intValue()).
setDefaultDisklessEnable(config.logDisklessEnable).
setDisklessStorageSystemEnabled(config.disklessStorageSystemEnabled).
setDisklessManagedReplicasEnabled(config.disklessManagedReplicasEnabled).
setClassicRemoteStorageForceEnabled(config.classicRemoteStorageForceEnabled).
setClassicRemoteStorageForceExcludeTopicRegexes(config.classicRemoteStorageForceExcludeTopicRegexes).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
/** Diskless Configuration */
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG)
val disklessManagedReplicasEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG)
val classicRemoteStorageForceEnabled: Boolean = getBoolean(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG)
val classicRemoteStorageForceExcludeTopicRegexes: java.util.List[String] =
getList(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public static class Builder {

private boolean defaultDisklessEnable = false;
private boolean disklessStorageSystemEnabled = false;
private boolean disklessManagedReplicasEnabled = false;
private boolean classicRemoteStorageForceEnabled = false;
private List<String> classicRemoteStorageForceExcludeTopicRegexes = List.of();

Expand Down Expand Up @@ -295,6 +296,11 @@ public Builder setDisklessStorageSystemEnabled(boolean disklessStorageSystemEnab
return this;
}

public Builder setDisklessManagedReplicasEnabled(boolean disklessManagedReplicasEnabled) {
this.disklessManagedReplicasEnabled = disklessManagedReplicasEnabled;
return this;
}

public Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) {
this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled;
return this;
Expand Down Expand Up @@ -454,6 +460,7 @@ public QuorumController build() throws Exception {
defaultNumPartitions,
defaultDisklessEnable,
disklessStorageSystemEnabled,
disklessManagedReplicasEnabled,
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes,
replicaPlacer,
Expand Down Expand Up @@ -1497,6 +1504,7 @@ private QuorumController(
int defaultNumPartitions,
boolean defaultDisklessEnable,
boolean disklessStorageSystemEnabled,
boolean disklessManagedReplicasEnabled,
boolean classicRemoteStorageForceEnabled,
List<String> classicRemoteStorageForceExcludeTopicRegexes,
ReplicaPlacer replicaPlacer,
Expand Down Expand Up @@ -1583,6 +1591,7 @@ private QuorumController(
setDefaultNumPartitions(defaultNumPartitions).
setDefaultDisklessEnable(defaultDisklessEnable).
setDisklessStorageSystemEnabled(disklessStorageSystemEnabled).
setDisklessManagedReplicasEnabled(disklessManagedReplicasEnabled).
setClassicRemoteStorageForceEnabled(classicRemoteStorageForceEnabled).
setClassicRemoteStorageForceExcludeTopicRegexes(classicRemoteStorageForceExcludeTopicRegexes).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ static class Builder {
private int defaultNumPartitions = 1;
private boolean defaultDisklessEnable = false;
private boolean isDisklessStorageSystemEnabled = false;
private boolean isDisklessManagedReplicasEnabled = false;
private boolean classicRemoteStorageForceEnabled = false;
private List<String> classicRemoteStorageForceExcludeTopicRegexes = List.of();

Expand Down Expand Up @@ -201,6 +202,11 @@ public Builder setDisklessStorageSystemEnabled(boolean isDisklessStorageSystemEn
return this;
}

public Builder setDisklessManagedReplicasEnabled(boolean isDisklessManagedReplicasEnabled) {
this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled;
return this;
}

public Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) {
this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled;
return this;
Expand Down Expand Up @@ -253,6 +259,7 @@ ReplicationControlManager build() {
defaultNumPartitions,
defaultDisklessEnable,
isDisklessStorageSystemEnabled,
isDisklessManagedReplicasEnabled,
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes,
maxElectionsPerImbalance,
Expand Down Expand Up @@ -332,9 +339,21 @@ static Map<String, String> translateCreationConfigs(CreatableTopicConfigCollecti
*/
private final boolean defaultDisklessEnable;

/**
* When true, the diskless storage system is enabled, allowing diskless topics to be created.
*/
private final boolean isDisklessStorageSystemEnabled;
private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy;

/**
* When true, diskless topics use managed replicas with RF = rack_count (one replica per rack).
* When false, diskless topics use legacy RF=1 behavior.
*
* <p>Phase 1 limitation: This config only affects topic creation. Add Partitions inherits
* RF from existing partitions (correct behavior - maintains consistency within the topic).
*/
private final boolean isDisklessManagedReplicasEnabled;

/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
Expand Down Expand Up @@ -425,6 +444,7 @@ private ReplicationControlManager(
int defaultNumPartitions,
boolean defaultDisklessEnable,
boolean isDisklessStorageSystemEnabled,
boolean isDisklessManagedReplicasEnabled,
boolean classicRemoteStorageForceEnabled,
List<String> classicRemoteStorageForceExcludeTopicRegexes,
int maxElectionsPerImbalance,
Expand All @@ -439,6 +459,7 @@ private ReplicationControlManager(
this.defaultNumPartitions = defaultNumPartitions;
this.defaultDisklessEnable = defaultDisklessEnable;
this.isDisklessStorageSystemEnabled = isDisklessStorageSystemEnabled;
this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled;
this.classicTopicRemoteStorageForcePolicy = new ClassicTopicRemoteStorageForcePolicy(
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes
Expand Down Expand Up @@ -803,9 +824,12 @@ private ApiError createTopic(ControllerRequestContext context,
"when the diskless storage system is disabled. " +
"Please enable the diskless storage system to create diskless topics.");
}
if (Math.abs(topic.replicationFactor()) != 1) {
// Diskless RF validation:
// When managed replicas enabled: any valid RF accepted (standard Kafka validation applies later).
// When managed replicas disabled (legacy): only RF=1 or RF=-1 (resolves to 1).
if (!isDisklessManagedReplicasEnabled && Math.abs(topic.replicationFactor()) != 1) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Replication factor for diskless topics must be 1 or -1 to use the default value (1).");
"Replication factor for diskless topics must be 1 or -1 when managed replicas are disabled.");
}
}

Expand All @@ -820,7 +844,7 @@ private ApiError createTopic(ControllerRequestContext context,
"A manual partition assignment was specified, but numPartitions " +
"was not set to -1.");
}
if (disklessEnabled) {
if (disklessEnabled && !isDisklessManagedReplicasEnabled) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment cannot be specified for diskless topics.");
}
Expand Down Expand Up @@ -868,12 +892,17 @@ private ApiError createTopic(ControllerRequestContext context,
} else {
int numPartitions = topic.numPartitions() == -1 ?
defaultNumPartitions : topic.numPartitions();
short replicationFactor = topic.replicationFactor() == -1 ?
defaultReplicationFactor : topic.replicationFactor();
short classicReplicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor();
// For managed diskless: use same resolution as classic (RF=-1 → defaultReplicationFactor, else user value).
// For unmanaged diskless (legacy): always RF=1.
short disklessReplicationFactor = isDisklessManagedReplicasEnabled ? classicReplicationFactor : 1;
short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking a lot about this. Overall I agree that from a service provider point of view it's a good feature to choose replication factor based on available zones or regions automatically. On the other hand Kafka users don't really expect that the replication factor will be chosen for them and they may not use rack awareness at all. So this assumes that diskless partitions will be used on cloud. Should we implement this feature in Aiven Core instead?
If this is an Aiven specific feature it might be OK, but perhaps we could consider moving this to Aiven Core in that case too to avoid modifications in core Kafka.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative you can use manual assignment and specify the RF that you prefer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, on a third thought, we might be fine as long as the user can opt to use manual assignment. Then maybe it's enough to log that we auto-selected the replication factor and brokers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question. The main goal was not to hide but to keep things backward compatible with how diskless topics are currently created with RF=1 (default -1 was turned to 1), and having this mechanism to define a proper default for diskless.

On a second thought, I wonder if we can remove this "magic RF" and leave the user-defined RF. I'm looking into the implications.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a couple of fixup commits to remove the calculated RF, and rely on user-provided value. It relies a bit more on how the cluster is configured, but as long as default replication factor = racks, it achieves the same result when RF=-1.

I think we can go with this approach to simplify things and avoid diverging further from classic kafka. PTAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, it's true that without being careful you can end up with diskless topics not spread among all available AZs (like if you don't edit default.replication.factor and use RF=-1), but it's better to have maximum flexibility.

try {
TopicAssignment topicAssignment;
Predicate<Integer> brokerFilter;
if (!disklessEnabled) {
// Diskless managed-replicas uses standard rack-aware assignment
// with user-defined RF (or defaultReplicationFactor if RF=-1)
if (!disklessEnabled || isDisklessManagedReplicasEnabled) {
topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
0,
numPartitions,
Expand Down
Loading