junrao commented on code in PR #20334: URL: https://github.com/apache/kafka/pull/20334#discussion_r2291889455
########## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ########## @@ -1887,8 +1887,10 @@ public int deleteOldSegments() throws IOException { return deleteLogStartOffsetBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteRetentionMsBreachedSegments(); - } else { + } else if (config().compact) { return deleteLogStartOffsetBreachedSegments(); + } else { + return deleteLogStartOffsetBreachedSegments() + deleteRetentionSizeBreachedSegments(); Review Comment: Well, an empty cleanup.policy is the same as delete with retention time and retention size set to -1, which tiered storage supports. So, empty cleanup.policy should be supported by tiered storage. In the KIP, we have the following. `If cleanup.policy is empty and remote.storage.enable is set to true, the local log segments will be cleaned based on the values of log.local.retention.bytes and log.local.retention.ms.` ########## server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java: ########## @@ -79,7 +80,11 @@ public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> originals, Map<String } public List<String> logDirs() { - return Csv.parseCsvList(Optional.ofNullable(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getString(ServerLogConfigs.LOG_DIR_CONFIG))); + return Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG)) + .orElse(Arrays.stream(getString(ServerLogConfigs.LOG_DIR_CONFIG).split(",")).toList()) Review Comment: Should we just define LOG_DIR_CONFIG as a List then? If so, we need to update the KIP. ########## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ########## @@ -155,8 +155,8 @@ public class SocketServerConfigs { public static final String NUM_NETWORK_THREADS_DOC = "The number of threads that the server uses for receiving requests from the network and sending responses to the network. Noted: each listener (except for controller listener) creates its own thread pool."; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH, LISTENERS_DOC) - .define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, ADVERTISED_LISTENERS_DOC) + .define(LISTENERS_CONFIG, LIST, LISTENERS_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(false, false), HIGH, LISTENERS_DOC) + .define(ADVERTISED_LISTENERS_CONFIG, LIST, null, HIGH, ADVERTISED_LISTENERS_DOC) Review Comment: Should we validate that it can't be empty? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -561,8 +561,7 @@ public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boo @SuppressWarnings("unchecked") private static void validateRemoteStorageRequiresDeleteCleanupPolicy(Map<?, ?> props) { List<String> cleanupPolicy = (List<String>) props.get(TopicConfig.CLEANUP_POLICY_CONFIG); - Set<String> policySet = cleanupPolicy.stream().map(policy -> policy.toLowerCase(Locale.getDefault())).collect(Collectors.toSet()); - if (!Set.of(TopicConfig.CLEANUP_POLICY_DELETE).equals(policySet)) { + if (cleanupPolicy.size() != 1 || !TopicConfig.CLEANUP_POLICY_DELETE.equals(cleanupPolicy.get(0))) { Review Comment: We need to change the logic to accommodate for empty cleanupPolicy with remote storage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org