Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

52 changes: 0 additions & 52 deletions core/src/test/java/kafka/server/InklessConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import io.aiven.inkless.test_utils.S3TestContainer;

import static org.apache.kafka.common.config.TopicConfig.DISKLESS_ENABLE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -195,57 +194,6 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception {
cluster.close();
}

@Test
public void disklessMigrationEnabled() throws Exception {
var cluster = init(false, true, true);
Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (Admin admin = AdminClient.create(clientConfigs)) {
// When creating a new topic with diskless.enable=false AND remote.log.storage.enable=true
final String tieredTopic = "tieredTopic";
createTopic(admin, tieredTopic, Map.of(
DISKLESS_ENABLE_CONFIG, "false",
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
));
// Then diskless.enable is set to false in the topic config
var tieredTopicConfig = getTopicConfig(admin, tieredTopic);
assertEquals("false", tieredTopicConfig.get(DISKLESS_ENABLE_CONFIG));
assertEquals("true", tieredTopicConfig.get("remote.storage.enable"));

// When migration is enabled AND remote storage is enabled, it SHOULD be possible to turn on diskless
alterTopicConfig(admin, tieredTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true"));
// Verify the config was updated
var updatedTopicConfig = getTopicConfig(admin, tieredTopic);
assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG));

// But it should still NOT be possible to turn off diskless after enabling it
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, tieredTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")));
}
cluster.close();
}

@Test
public void disklessMigrationRequiresRemoteStorage() throws Exception {
var cluster = init(false, true, true);
Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (Admin admin = AdminClient.create(clientConfigs)) {
// When creating a new topic with diskless.enable=false WITHOUT remote storage
final String classicTopic = "classicTopic";
createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"));
// Then diskless.enable is set to false in the topic config
var classicTopicConfig = getTopicConfig(admin, classicTopic);
assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG));

// Even with migration enabled, it should NOT be possible to turn on diskless
// because remote storage is not enabled on this topic
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")));
}
cluster.close();
}

public void createTopic(Admin admin, String topic, Map<String, String> configs) throws Exception {
admin.createTopics(Collections.singletonList(
new NewTopic(topic, 1, (short) 1)
Expand Down
267 changes: 107 additions & 160 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,175 +430,122 @@ class LogConfigTest {
}


@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testDisklessConfigsAtCreation(enable: Boolean): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val emptyExistingConfigs: util.Map[String, String] = util.Map.of()

// Should be possible to set only diskless.enable to true/false at creation time
val setDiskless = new Properties
setDiskless.put(TopicConfig.DISKLESS_ENABLE_CONFIG, enable.toString)
LogConfig.validate(emptyExistingConfigs, setDiskless, kafkaConfig.extractLogConfigMap, false)
}

@Test
def testDisklessConfigsAtUpdate(): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)

val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")

val setDisklessTrue = new Properties()
setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
val setDisklessFalse = new Properties()
setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")

// Given diskless.enable=true:
// Should be possible to set diskless.enable to true
LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false)
// Should NOT be possible to set diskless.enable to false
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false)
)

// Given diskless.enable=false:
// Should NOT be possible to set diskless.enable to true
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false)
)
// Should be possible to set diskless.enable to false
LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false)

// Given existing topic without diskless.enable set (e.g., created before diskless was introduced):
// Should NOT be possible to set diskless.enable to true
val topicWithoutDisklessConfig = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(topicWithoutDisklessConfig, setDisklessTrue, kafkaConfig.extractLogConfigMap, false)
)
def testDisklessAndRemoteStorageAtCreation(): Unit = {
val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig())
val noExisting: util.Map[String, String] = util.Map.of()
val mutualExclusionError = "remote.storage.enable cannot be set if diskless.enable is set to true."

// Allowed to set diskless.enable=true at creation
assertValid(noExisting, topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true"), kafkaConfig)

// Allowed to set remote.storage.enable=true at creation
assertValid(noExisting, topicProps(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"), kafkaConfig)

// NOT Allowed to set diskless.enable=false and remote.storage.enable=false at creation
assertInvalid(noExisting, topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "false",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false"),
mutualExclusionError,
kafkaConfig)

// NOT Allowed to set diskless.enable=false and remote.storage.enable=true at creation
assertInvalid(noExisting, topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "false",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"),
mutualExclusionError,
kafkaConfig)

// NOT Allowed to set diskless.enable=true and remote.storage.enable=false at creation
assertInvalid(noExisting, topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false"),
mutualExclusionError,
kafkaConfig)

// NOT Allowed to set diskless.enable=true and remote.storage.enable=true at creation
assertInvalid(noExisting, topicProps(
TopicConfig.DISKLESS_ENABLE_CONFIG -> "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true"),
mutualExclusionError,
kafkaConfig)
}

@Test
def testDisklessMigrationEnabled(): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val isRemoteStorageSystemEnabled = true
// Given that migrating from classic to diskless is enabled
val isDisklessAllowFromClassicEnabled = true

// 1. Should be possible to switch from diskless.enable=false to diskless.enable=true for a tiered topic
val tieredTopicWithDisklessDisabled = util.Map.of(
TopicConfig.DISKLESS_ENABLE_CONFIG, "false",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
)
val migrateToDiskless = new Properties()
migrateToDiskless.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
migrateToDiskless.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(tieredTopicWithDisklessDisabled, migrateToDiskless, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled)

// 1.1 Should be possible to switch from no diskless config to diskless.enable=true for a tiered topic
val tieredTopicWithoutDisklessConfig = util.Map.of(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
)
LogConfig.validate(tieredTopicWithoutDisklessConfig, migrateToDiskless, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled)

// 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled)
val disklessTopic = util.Map.of(
TopicConfig.DISKLESS_ENABLE_CONFIG, "true"
)
val setDisklessFalse = new Properties()
setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(disklessTopic, setDisklessFalse, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled)
)

// 3. After migration (diskless=true with remote.storage=true), should still be able to alter other configs
val migratedTopic = util.Map.of(
TopicConfig.DISKLESS_ENABLE_CONFIG, "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
)
val keepMigratedState = new Properties()
keepMigratedState.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
keepMigratedState.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(migratedTopic, keepMigratedState, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled)
def testDisklessAndRemoteStorageAtUpdate(): Unit = {
val kafkaConfig = KafkaConfig.fromProps(TestUtils.createDummyBrokerConfig())
val mutualExclusionError = "remote.storage.enable cannot be set if diskless.enable is set to true."
val existingWithoutDisklessOrRemote = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")
val existingWithDisklessFalse = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")
val existingWithDisklessTrue = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
val existingWithRemoteFalse = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
val existingWithRemoteTrue = util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")

// Case 1: set diskless.enable=true
val setDisklessTrue = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "true")

assertInvalid(existingWithoutDisklessOrRemote, setDisklessTrue,
"It is invalid to enable diskless on an already existing topic.", kafkaConfig)
assertInvalid(existingWithDisklessFalse, setDisklessTrue,
"It is invalid to enable diskless on an already existing topic.", kafkaConfig)
assertValid(existingWithDisklessTrue, setDisklessTrue, kafkaConfig)
assertInvalid(existingWithRemoteFalse, setDisklessTrue,
"It is invalid to enable diskless on an already existing topic.", kafkaConfig)
assertInvalid(existingWithRemoteTrue, setDisklessTrue,
"It is invalid to enable diskless on an already existing topic.", kafkaConfig)

// Case 2: set diskless.enable=false
val setDisklessFalse = topicProps(TopicConfig.DISKLESS_ENABLE_CONFIG -> "false")

assertValid(existingWithoutDisklessOrRemote, setDisklessFalse, kafkaConfig)
assertValid(existingWithDisklessFalse, setDisklessFalse, kafkaConfig)
assertInvalid(existingWithDisklessTrue, setDisklessFalse,
"It is invalid to disable diskless.", kafkaConfig)
assertInvalid(existingWithRemoteFalse, setDisklessFalse,
mutualExclusionError, kafkaConfig)
assertInvalid(existingWithRemoteTrue, setDisklessFalse,
mutualExclusionError,
kafkaConfig)

// Case 3: set remote.storage.enable=true
val setRemoteStorageTrue = topicProps(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "true")

assertValid(existingWithoutDisklessOrRemote, setRemoteStorageTrue, kafkaConfig)
assertInvalid(existingWithDisklessFalse, setRemoteStorageTrue,
mutualExclusionError, kafkaConfig)
assertInvalid(existingWithDisklessTrue, setRemoteStorageTrue,
mutualExclusionError, kafkaConfig)
assertValid(existingWithRemoteFalse, setRemoteStorageTrue, kafkaConfig)
assertValid(existingWithRemoteTrue, setRemoteStorageTrue, kafkaConfig)

// Case 4: set remote.storage.enable=false
val setRemoteStorageFalse = topicProps(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG -> "false")

assertValid(existingWithoutDisklessOrRemote, setRemoteStorageFalse, kafkaConfig)
assertInvalid(existingWithDisklessFalse, setRemoteStorageFalse,
mutualExclusionError, kafkaConfig)
assertInvalid(existingWithDisklessTrue, setRemoteStorageFalse,
mutualExclusionError, kafkaConfig)
assertValid(existingWithRemoteFalse, setRemoteStorageFalse, kafkaConfig)
assertInvalid(existingWithRemoteTrue, setRemoteStorageFalse,
"It is invalid to disable remote storage without deleting remote data. If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.",
kafkaConfig)
}

@Test
def testDisklessMigrationRequiresBothMigrationAndRemoteStorage(): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)

val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")

val setDisklessTrue = new Properties()
setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")

// Case 1: Migration enabled but remote storage NOT enabled
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(
disklessAlreadyDisabled,
setDisklessTrue,
kafkaConfig.extractLogConfigMap,
false, // isRemoteLogStorageSystemEnabled
true) // isDisklessAllowFromClassicEnabled
)

// Case 2: Remote storage enabled but migration NOT enabled
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(
disklessAlreadyDisabled,
setDisklessTrue,
kafkaConfig.extractLogConfigMap,
true, // isRemoteLogStorageSystemEnabled
false) // isDisklessAllowFromClassicEnabled
)

// Case 3: Neither migration nor remote storage enabled
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(
disklessAlreadyDisabled,
setDisklessTrue,
kafkaConfig.extractLogConfigMap,
false, // isRemoteLogStorageSystemEnabled
false) // isDisklessAllowFromClassicEnabled
)
private def topicProps(entries: (String, String)*): Properties = {
val props = new Properties()
entries.foreach { case (k, v) => props.put(k, v) }
props
}

@Test
def testInvalidDisklessAndRemoteStorageEnable(): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)

val logProps = new Properties
logProps.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
private def assertValid(existingConfigs: util.Map[String, String], props: Properties, kafkaConfig: KafkaConfig): Unit = {
LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true)
}

// Add diskless to existing TS topic (diskless not previously set) - treated same as diskless=false
val t1 = assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true))
assertEquals("To migrate a classic topic to diskless, both diskless.enable and remote.storage.enable must be set to true, and the broker config diskless.allow.from.classic.enable must also be enabled.", t1.getMessage)

// Add remote storage
val t2 = assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true))
assertEquals("It is invalid to enable remote storage on an existing diskless topic.", t2.getMessage)

// Create a diskless topic with remote storage enabled is invalid
val t3 = assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, true))
assertEquals("It is invalid to create a diskless topic with remote storage enabled.", t3.getMessage)
private def assertInvalid(existingConfigs: util.Map[String, String], props: Properties, expectedMessage: String, kafkaConfig: KafkaConfig): Unit = {
val ex = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(existingConfigs, props, kafkaConfig.extractLogConfigMap, true))
assertEquals(expectedMessage, ex.getMessage)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ private ApiError incrementalAlterConfigResource(
String opValue = opTypeAndNewValue.getValue();
switch (opType) {
case SET:
if (!newlyCreatedResource &&
configResource.type().equals(Type.TOPIC) &&
Objects.equals(key, TopicConfig.DISKLESS_ENABLE_CONFIG) &&
Boolean.parseBoolean(opValue) &&
!Boolean.parseBoolean(currentValue)) {
return ApiError.fromThrowable(
new InvalidConfigurationException("It is invalid to enable diskless on an already existing topic."));
}
newValue = opValue;
break;
case DELETE:
Expand Down
Loading