Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
}
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
kafkaConfig.disklessAllowFromClassicEnabled)
kafkaConfig.disklessAllowFromClassicEnabled,
kafkaConfig.disklessStorageSystemEnabled)
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test
import java.util
import java.util.Properties
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator}
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -471,6 +472,26 @@ class LogConfigTest {
kafkaConfig)
}

@ParameterizedTest(name = "testDisklessExplicitConfigRejectedWhenSystemDisabled with value: {0}")
@ValueSource(booleans = Array(true, false))
def testDisklessExplicitConfigRejectedWhenSystemDisabled(disklessEnableValue: Boolean): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "false")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)

val ex = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(
util.Map.of[String, String](),
topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> disklessEnableValue.toString),
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled,
kafkaConfig.disklessAllowFromClassicEnabled,
kafkaConfig.disklessStorageSystemEnabled
))
assertEquals("It is invalid to set diskless.enable if diskless storage system is not enabled.",
ex.getMessage)
}

@Test
def testDisklessAndRemoteStorageAtUpdate(): Unit = {
val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,8 @@ public static void validateBrokerLogConfigValues(Map<?, ?> props,

private static void validateDiskless(Map<String, String> existingConfigs,
Map<String, Object> requestedConfigs,
Map<?, ?> newConfigs) {
Map<?, ?> newConfigs,
boolean isDisklessStorageSystemEnabled) {
final boolean isCreation = existingConfigs.isEmpty();
final boolean isDisklessExplicitlySet = requestedConfigs.containsKey(TopicConfig.DISKLESS_ENABLE_CONFIG);
final boolean isRemoteStorageExplicitlySet = requestedConfigs.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
Expand All @@ -506,13 +507,29 @@ private static void validateDiskless(Map<String, String> existingConfigs,
final boolean wasDisklessEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.DISKLESS_ENABLE_CONFIG, "false"));
final boolean requestedDisklessEnabled = (Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG);

if (isDisklessExplicitlySet && !isDisklessStorageSystemEnabled) {
throw new InvalidConfigurationException("It is invalid to set diskless.enable if diskless storage system is not enabled.");
}

final boolean isDisklessEnabled;
if (isDisklessExplicitlySet) {
isDisklessEnabled = requestedDisklessEnabled;
} else {
isDisklessEnabled = wasDisklessEnabled;
}

validateDisklessTransition(isCreation, isDisklessExplicitlySet, isDisklessEnabled, wasDisklessEnabled);

// Only one between diskless.enable and remote.storage.enable can be set, no matter the value.
final boolean hasExplicitDiskless = isDisklessExplicitlySet || wasDisklessExplicitlySet;
final boolean hasExplicitRemoteStorage = isRemoteStorageExplicitlySet || wasRemoteStorageExplicitlySet;
validateDisklessAndRemoteStorageMutualExclusion(isDisklessExplicitlySet, isRemoteStorageExplicitlySet, hasExplicitDiskless, hasExplicitRemoteStorage);
}

private static void validateDisklessTransition(boolean isCreation,
boolean isDisklessExplicitlySet,
boolean isDisklessEnabled,
boolean wasDisklessEnabled) {
// Diskless can be enabled only at creation
if (!isCreation && isDisklessExplicitlySet && isDisklessEnabled && !wasDisklessEnabled) {
throw new InvalidConfigurationException("It is invalid to enable diskless on an already existing topic.");
Expand All @@ -522,10 +539,12 @@ private static void validateDiskless(Map<String, String> existingConfigs,
if (!isCreation && isDisklessExplicitlySet && !isDisklessEnabled && wasDisklessEnabled) {
throw new InvalidConfigurationException("It is invalid to disable diskless.");
}
}

// Only one between diskless.enable and remote.storage.enable can be set, no matter the value.
final boolean hasExplicitDiskless = isDisklessExplicitlySet || wasDisklessExplicitlySet;
final boolean hasExplicitRemoteStorage = isRemoteStorageExplicitlySet || wasRemoteStorageExplicitlySet;
Comment on lines -526 to -528
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted to specific methods because validateDiskless was exceeding the limit on cyclomatic complexity.

private static void validateDisklessAndRemoteStorageMutualExclusion(boolean isDisklessExplicitlySet,
boolean isRemoteStorageExplicitlySet,
boolean hasExplicitDiskless,
boolean hasExplicitRemoteStorage) {
if ((isDisklessExplicitlySet && hasExplicitRemoteStorage) ||
(isRemoteStorageExplicitlySet && hasExplicitDiskless)) {
throw new InvalidConfigurationException("It is not valid to set a value for both diskless.enable and remote.storage.enable.");
Expand All @@ -545,9 +564,10 @@ private static void validateDiskless(Map<String, String> existingConfigs,
private static void validateTopicLogConfigValues(Map<String, String> existingConfigs,
Map<String, Object> requestedConfigs,
Map<?, ?> newConfigs,
boolean isRemoteLogStorageSystemEnabled) {
boolean isRemoteLogStorageSystemEnabled,
boolean isDisklessStorageSystemEnabled) {
validateValues(newConfigs);
validateDiskless(existingConfigs, requestedConfigs, newConfigs);
validateDiskless(existingConfigs, requestedConfigs, newConfigs, isDisklessStorageSystemEnabled);

boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
Expand Down Expand Up @@ -663,6 +683,16 @@ public static void validate(Map<String, String> existingConfigs,
Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled,
boolean isDisklessAllowFromClassicEnabled) {
validate(existingConfigs, props, configuredProps, isRemoteLogStorageSystemEnabled,
isDisklessAllowFromClassicEnabled, true);
}

public static void validate(Map<String, String> existingConfigs,
Properties props,
Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled,
boolean isDisklessAllowFromClassicEnabled,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: dead argument. Can be fixed in a follow-up.

boolean isDisklessStorageSystemEnabled) {
validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) {
Map<?, ?> valueMaps = CONFIG.parse(props);
Expand All @@ -671,7 +701,8 @@ public static void validate(Map<String, String> existingConfigs,
Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps);
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateTopicLogConfigValues(existingConfigs, Utils.castToStringObjectMap(props), valueMaps, isRemoteLogStorageSystemEnabled);
validateTopicLogConfigValues(existingConfigs, Utils.castToStringObjectMap(props), valueMaps,
isRemoteLogStorageSystemEnabled, isDisklessStorageSystemEnabled);
}
}

Expand Down