junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2291889455


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1887,8 +1887,10 @@ public int deleteOldSegments() throws IOException {
             return deleteLogStartOffsetBreachedSegments() +
                     deleteRetentionSizeBreachedSegments() +
                     deleteRetentionMsBreachedSegments();
-        } else {
+        } else if (config().compact) {
             return deleteLogStartOffsetBreachedSegments();
+        } else {
+            return deleteLogStartOffsetBreachedSegments() + 
deleteRetentionSizeBreachedSegments();

Review Comment:
   Well, an empty cleanup.policy is the same as delete with retention time and 
retention size set to -1, which tiered storage supports. So, empty 
cleanup.policy should be supported by tiered storage. In the KIP, we have the 
following.
   
   `If cleanup.policy is empty and remote.storage.enable is set to true, the 
local log segments will be cleaned based on the values of 
log.local.retention.bytes and log.local.retention.ms.`



##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -79,7 +80,11 @@ public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> 
originals, Map<String
     }
 
     public List<String> logDirs() {
-        return 
Csv.parseCsvList(Optional.ofNullable(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getString(ServerLogConfigs.LOG_DIR_CONFIG)));
+        return Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG))
+                
.orElse(Arrays.stream(getString(ServerLogConfigs.LOG_DIR_CONFIG).split(",")).toList())

Review Comment:
   Should we just define LOG_DIR_CONFIG as a List then? If so, we need to 
update the KIP.



##########
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##########
@@ -155,8 +155,8 @@ public class SocketServerConfigs {
     public static final String NUM_NETWORK_THREADS_DOC = "The number of 
threads that the server uses for receiving requests from the network and 
sending responses to the network. Noted: each listener (except for controller 
listener) creates its own thread pool.";
 
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
-            .define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH, 
LISTENERS_DOC)
-            .define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, 
ADVERTISED_LISTENERS_DOC)
+            .define(LISTENERS_CONFIG, LIST, LISTENERS_DEFAULT, 
ConfigDef.ValidList.anyNonDuplicateValues(false, false), HIGH, LISTENERS_DOC)
+            .define(ADVERTISED_LISTENERS_CONFIG, LIST, null, HIGH, 
ADVERTISED_LISTENERS_DOC)

Review Comment:
   Should we validate that it can't be empty?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -561,8 +561,7 @@ public static void 
validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boo
     @SuppressWarnings("unchecked")
     private static void 
validateRemoteStorageRequiresDeleteCleanupPolicy(Map<?, ?> props) {
         List<String> cleanupPolicy = (List<String>) 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG);
-        Set<String> policySet = cleanupPolicy.stream().map(policy -> 
policy.toLowerCase(Locale.getDefault())).collect(Collectors.toSet());
-        if (!Set.of(TopicConfig.CLEANUP_POLICY_DELETE).equals(policySet)) {
+        if (cleanupPolicy.size() != 1 || 
!TopicConfig.CLEANUP_POLICY_DELETE.equals(cleanupPolicy.get(0))) {

Review Comment:
   We need to change the logic to accommodate for empty cleanupPolicy with 
remote storage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to