junrao commented on code in PR #20334: URL: https://github.com/apache/kafka/pull/20334#discussion_r2294227533
########## core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala: ########## @@ -260,11 +260,13 @@ abstract class QuorumTestHarness extends Logging { props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath) val proto = controllerListenerSecurityProtocol.toString val securityProtocolMaps = extraControllerSecurityProtocols().map(sc => sc + ":" + sc).mkString(",") - val listeners = extraControllerSecurityProtocols().map(sc => sc + "://localhost:0").mkString(",") - val listenerNames = extraControllerSecurityProtocols().mkString(",") + val listeners = extraControllerSecurityProtocols().map(sc => sc + "://localhost:0").mkString(",").trim Review Comment: Why is `trim` needed? Ditto below. ########## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ########## @@ -159,7 +159,9 @@ public class TopicConfig { "<a href=\"#compaction\">log compaction</a>, which retains the latest value for each key. " + "It is also possible to specify both policies in a comma-separated list (e.g. \"delete,compact\"). " + "In this case, old segments will be discarded per the retention time and size configuration, " + - "while retained segments will be compacted."; + "while retained segments will be compacted." + + "An empty list means infinite retention - no cleanup policies will be applied and log segments " + + "will be retained indefinitely."; Review Comment: It would be useful to mention that local retention is still enforced with remote storage enabled. Also, could we update the doc for cleanup.policy in ServerLogConfigs too? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -561,9 +561,8 @@ public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boo @SuppressWarnings("unchecked") private static void validateRemoteStorageRequiresDeleteCleanupPolicy(Map<?, ?> props) { List<String> cleanupPolicy = (List<String>) props.get(TopicConfig.CLEANUP_POLICY_CONFIG); - Set<String> policySet = cleanupPolicy.stream().map(policy -> policy.toLowerCase(Locale.getDefault())).collect(Collectors.toSet()); - if (!Set.of(TopicConfig.CLEANUP_POLICY_DELETE).equals(policySet)) { - throw new ConfigException("Remote log storage only supports topics with cleanup.policy=delete"); + if (!cleanupPolicy.isEmpty() && (cleanupPolicy.size() != 1 || !TopicConfig.CLEANUP_POLICY_DELETE.equals(cleanupPolicy.get(0)))) { + throw new ConfigException("Remote log storage only supports topics with cleanup.policy=delete or cleanup.policy is empty list."); Review Comment: cleanup.policy is empty list => cleanup.policy being an empty list ########## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ########## @@ -1879,16 +1879,25 @@ private int deleteSegments(List<LogSegment> deletable, SegmentDeletionReason rea /** * If topic deletion is enabled, delete any local log segments that have either expired due to time based - * retention or because the log size is > retentionSize. Whether or not deletion is enabled, delete any local + * retention or because the log size is > retentionSize. Empty cleanup.policy with remote storage enabled + * behaves the same as deletion policy. Whether or not deletion is enabled, delete any local Review Comment: Empty cleanup.policy with remote storage enabled behaves the same as deletion policy => Empty cleanup.policy is the same as delete with infinite retention. So, we only need to delete local segments if remote storage is enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org