mumrah commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1671034456


##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
         return ApiError.NONE;
     }
 
+    void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion> 
records) {
+        List<ConfigRecord> minIsrRecords = new ArrayList<>();
+        Map<String, String> topicToMinIsrValueMap = new HashMap<>();
+        Map<String, String> configRemovedTopicMap = new HashMap<>();
+        records.forEach(record -> {
+            if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+                ConfigRecord configRecord = (ConfigRecord) record.message();
+                if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+                    minIsrRecords.add(configRecord);
+                    if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+                        if (configRecord.value() != null) 
topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
+                        else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+                    }
+                }
+            }
+        });
+
+        if (minIsrRecords.isEmpty()) return;
+        if (topicToMinIsrValueMap.size() == minIsrRecords.size()) {

Review Comment:
   The size comparison here and below are a little non-obvious (to me at 
least). Maybe we can set a boolean as we're looping through the records to 
determine if we hit this branch.
   
   Alternative question, is this optimization helping with performance? We 
still need the code for the case of overlaying configs from different levels, 
so having this separate code path just increases complexity. 



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
         return ApiError.NONE;
     }
 
+    void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion> 
records) {

Review Comment:
   The complexity here is a tad bit high. Can we extract a method for getting 
the minIsrRecords ConfigRecord from `List<ApiMessageAndVersion>`?



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -426,6 +497,35 @@ public void replay(ConfigRecord record) {
         }
     }
 
+    /**
+     * Apply a configuration record to the given config data. Note that, it 
will store null for the config to be removed.
+     *
+     * @param record                 The ConfigRecord.
+     * @param localConfigData        The config data is going to be updated.
+     */
+    public void replayForPendingConfig(

Review Comment:
   Can we reuse ConfigurationsImage here instead of adding another place where 
we are applying records? I think it should be reasonably straightforward to 
construct a ConfigurationsImage with the in-memory state (`localConfigData`) 
and then replay records to get a ConfigurationsDelta.



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
         return ApiError.NONE;
     }
 
+    void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion> 
records) {
+        List<ConfigRecord> minIsrRecords = new ArrayList<>();
+        Map<String, String> topicToMinIsrValueMap = new HashMap<>();
+        Map<String, String> configRemovedTopicMap = new HashMap<>();
+        records.forEach(record -> {
+            if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+                ConfigRecord configRecord = (ConfigRecord) record.message();
+                if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+                    minIsrRecords.add(configRecord);
+                    if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+                        if (configRecord.value() != null) 
topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
+                        else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());

Review Comment:
   style nit: Can you reformat these to not be inline?



##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -244,4 +245,34 @@ private ConfigEntry toConfigEntry(ConfigDef.ConfigKey 
configKey,
             translateConfigType(configKey.type()),
             configKey.documentation);
     }
+
+    /**
+     * OrderedConfigResolver helps to find the configs in the order of the 
list config maps.
+     * One thing to notice that, when calling containsKey, if a config 
contains a null value entry,
+     * it will return false as null value means the config value should be 
ignored.
+     **/
+    public static class OrderedConfigResolver {
+        List<Map<String, ?>> configs;
+        public OrderedConfigResolver(List<Map<String, ?>> maps) {
+            configs = maps;
+        }
+
+        public OrderedConfigResolver(Map<String, ?> map) {
+            configs = new ArrayList<>();
+            configs.add(map);
+        }
+        public boolean containsKey(String key) {

Review Comment:
   style nit: whitespace before method signature



##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -244,4 +245,34 @@ private ConfigEntry toConfigEntry(ConfigDef.ConfigKey 
configKey,
             translateConfigType(configKey.type()),
             configKey.documentation);
     }
+
+    /**
+     * OrderedConfigResolver helps to find the configs in the order of the 
list config maps.
+     * One thing to notice that, when calling containsKey, if a config 
contains a null value entry,
+     * it will return false as null value means the config value should be 
ignored.
+     **/
+    public static class OrderedConfigResolver {
+        List<Map<String, ?>> configs;
+        public OrderedConfigResolver(List<Map<String, ?>> maps) {
+            configs = maps;
+        }
+
+        public OrderedConfigResolver(Map<String, ?> map) {
+            configs = new ArrayList<>();
+            configs.add(map);
+        }
+        public boolean containsKey(String key) {

Review Comment:
   docs: can you document this method's behavior here?



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
         return ApiError.NONE;
     }
 
+    void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion> 
records) {
+        List<ConfigRecord> minIsrRecords = new ArrayList<>();
+        Map<String, String> topicToMinIsrValueMap = new HashMap<>();
+        Map<String, String> configRemovedTopicMap = new HashMap<>();
+        records.forEach(record -> {
+            if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+                ConfigRecord configRecord = (ConfigRecord) record.message();
+                if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+                    minIsrRecords.add(configRecord);
+                    if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+                        if (configRecord.value() != null) 
topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
+                        else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+                    }
+                }
+            }
+        });
+
+        if (minIsrRecords.isEmpty()) return;
+        if (topicToMinIsrValueMap.size() == minIsrRecords.size()) {
+            // If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+            // updated topics.
+            
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+                new ArrayList<>(topicToMinIsrValueMap.keySet()),
+                topicName -> topicToMinIsrValueMap.get(topicName))
+            );
+            return;
+        }
+
+        // Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+        // the config updates to it. Use this config copy for the min ISR look 
up.
+        Map<ConfigResource, Map<String, String>> pendingConfigData = new 
HashMap<>();
+
+        for (ConfigRecord record : minIsrRecords) {
+            replayForPendingConfig(record, pendingConfigData);
+        }
+
+        ArrayList<String> topicList = new ArrayList<>();
+        // If all the updates are on the Topic level, we can avoid perform a 
full scan of the partitions.

Review Comment:
   Similar to the above comment, this size check is hard to grok. Maybe we can 
compute a boolean to determine if we need to inspect every partition vs just a 
subset.
   
   In what case do we need to scan all partitions? Only when the cluster-level 
`min.insync.replicas` is changed?



##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -244,4 +245,34 @@ private ConfigEntry toConfigEntry(ConfigDef.ConfigKey 
configKey,
             translateConfigType(configKey.type()),
             configKey.documentation);
     }
+
+    /**
+     * OrderedConfigResolver helps to find the configs in the order of the 
list config maps.
+     * One thing to notice that, when calling containsKey, if a config 
contains a null value entry,
+     * it will return false as null value means the config value should be 
ignored.
+     **/
+    public static class OrderedConfigResolver {
+        List<Map<String, ?>> configs;
+        public OrderedConfigResolver(List<Map<String, ?>> maps) {
+            configs = maps;
+        }
+
+        public OrderedConfigResolver(Map<String, ?> map) {
+            configs = new ArrayList<>();
+            configs.add(map);
+        }
+        public boolean containsKey(String key) {
+            for (Map<String, ?> config : configs) {
+                if (config.containsKey(key)) return config.get(key) != null;

Review Comment:
   If a later config map has a non-null, I think we could mistakenly return 
false here. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to