jsancio commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r464779999



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -155,6 +157,10 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
         this.time = time;
         this.heartbeat = new Heartbeat(rebalanceConfig, time);
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
+        this.dynamicConfig = new DynamicConsumerConfig(client, this, 
rebalanceConfig, time, logContext);
+        if (!rebalanceConfig.enableDynamicConfig()) {
+            dynamicConfig.disable();
+        } 

Review comment:
       Two consecutive calls to `DynamicConsumerConfig`. How about:
   ```java
   this.dynamicConfig = new DynamicConsumerConfig(
     client, this, rebalanceConfig, time, logContext, 
rebalanceConfig.enableDynamicConfig()
   ); 
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -356,6 +362,11 @@ boolean ensureActiveGroup(final Timer timer) {
         }
 
         startHeartbeatThreadIfNeeded();
+        if (!dynamicConfig.shouldDisable()) {
+            // This will only return a future and block for it if this is 
before the first JoinGroupRequest being sent
+            RequestFuture<ClientResponse> configsFuture = 
dynamicConfig.maybeFetchInitialConfigs();
+            dynamicConfig.maybeWaitForInitialConfigs(configsFuture);

Review comment:
       There are 3 consecutive calls to methods for `dynamicConfig`. We can 
move all of this code to `DynamicConsumerConfig`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
                             log.info("Successfully joined group with 
generation {}", generation.generationId);
                             state = MemberState.STABLE;
                             rejoinNeeded = false;
+                            rebalanceConfig.coordinatorUpdated();

Review comment:
       I think we have a race between the `sendJoinGroupRequest` completing 
with some session timeout X and getting another value Y for the session timeout 
in `DescribeReponse`.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new 
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = 
adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = 
adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) 
ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> 
overlayedProps.getProperty(key))}.toMap
+
+            // Resort to default dynamic client config if configs are not 
specified for the client-id
+            if (clientId.nonEmpty) {
+              createResponseConfig(configMap,
+                createClientConfigEntry(clientId, clientProps, defaultProps, 
perClientIdConfig = true, 
+                  includeSynonyms, includeDocumentation))
+            } else {
+              createResponseConfig(configMap,
+                createClientConfigEntry(clientId, clientProps, defaultProps, 
perClientIdConfig = false, 
+                  includeSynonyms, includeDocumentation))
+            }

Review comment:
       ```scala
   createResponseConfig(
     configMap,
     createClientConfigEntry(
       clientId,
       clientProps,
       defaultProps, 
       perClientIdConfig = clientId.nonEmpty, 
       includeSynonyms,
       includeDocumentation
     )
   )
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update 
for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, 
GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for 
dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();

Review comment:
       Is this a livelock? How will this return non-null? What thread is 
calling this method and what thread is responsible for polling the client until 
there is a node?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update 
for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;

Review comment:
       This makes the concurrency requirement very hard to read and validate. 
This could be any object since it is pass through the constructor.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update 
for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, 
GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for 
dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join 
group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, 
newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to 
fetch the initial dynamic configurations synchronously before sending the 
initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this 
join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration 
for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could 
possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved 
asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was 
recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> 
responseFuture) {

Review comment:
       This method and the method above can be merged and simplify the API. 
Every call to `maybeFetchInitialConfigs` is followed by a call to 
`maybeWaitForInitialConfigs`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -148,6 +148,10 @@ public Sender(LogContext logContext,
         this.apiVersions = apiVersions;
         this.transactionManager = transactionManager;
         this.inFlightBatches = new HashMap<>();
+        this.dynamicConfig = new DynamicProducerConfig(client, config, time, 
logContext, requestTimeoutMs);
+        if 
(!config.getBoolean(CommonClientConfigs.ENABLE_DYNAMIC_CONFIG_CONFIG)) {
+            this.dynamicConfig.disable();

Review comment:
       Two consecutive calls to `DynamicProducerConfig`. We are already passing 
`config` in the constructor.
   

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update 
for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, 
GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for 
dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join 
group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, 
newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to 
fetch the initial dynamic configurations synchronously before sending the 
initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this 
join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration 
for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could 
possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved 
asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was 
recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> 
responseFuture) {
+        if (responseFuture != null) {
+            client.poll(responseFuture);
+            if (responseFuture.isDone()) {
+                DescribeConfigsResponse configsResponse = 
(DescribeConfigsResponse) responseFuture.value().responseBody();
+                handleSuccessfulResponse(configsResponse);
+                this.initialConfigsFetched = true;
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Maybe send a {@link DescribeConfigsRequest} to a node specifically for 
dynamic client configurations and 
+     * don't block waiting for a response. This will be used by the 
HeartbeatThread to periodically fetch dynamic configurations
+     *
+     * @param node Node to send request to
+     * @param now  Current time in milliseconds
+     */ 
+    @Override
+    public boolean maybeFetchConfigs(long now) {
+        if (shouldUpdateConfigs(now)) {
+            Node node = client.leastLoadedNode();
+            // Order matters, if the node is null we should not set 
updateInProgress to true.
+            // This is lazily evaluated so it is ok as long as order is kept
+            if (node != null && client.ready(node, now)) {
+                updateInProgress();
+                log.info("Sending periodic describe configs request for 
dynamic config update");
+                RequestFuture<ClientResponse> configsFuture = 
client.send(node, newRequestBuilder(this.clientId));
+                configsFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
+                    @Override
+                    public void onSuccess(ClientResponse resp) {
+                        synchronized (lock) {
+                            DescribeConfigsResponse configsResponse = 
(DescribeConfigsResponse) resp.responseBody();
+                            handleSuccessfulResponse(configsResponse);
+                            update();
+                        }
+                    }
+                    @Override
+                    public void onFailure(RuntimeException e) {
+                        synchronized (lock) {
+                            retry();
+                        }
+                    }
+                });
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Handle the {@link DescribeConfigsResponse} by processing the dynamic 
configs and resetting the RPC timer,
+     * or by disabling this feature if the broker is incompatible.
+     * @param resp {@link DescribeConfigsResponse}
+     */
+    private void handleSuccessfulResponse(DescribeConfigsResponse 
configsResponse) {
+        Map<String, String> dynamicConfigs = 
createResultMapAndHandleErrors(configsResponse, log);
+        log.info("DescribeConfigsResponse received");
+
+        // We only want to process them if they have changed since the last 
time they were fetched.
+        if (!dynamicConfigs.equals(previousDynamicConfigs)) {
+            previousDynamicConfigs = dynamicConfigs;
+            try {
+                rebalanceConfig.setDynamicConfigs(dynamicConfigs);
+            } catch (IllegalArgumentException e) {
+                log.info("Rejecting dynamic configs: {}", e.getMessage());
+            }
+        }
+        update();

Review comment:
       `update` is called twice in a row. Here and on line 132 on this file.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -321,11 +325,15 @@ void runOnce() {
         }
 
         long currentTimeMs = time.milliseconds();
-        long pollTimeout = sendProducerData(currentTimeMs);
+        if (!dynamicConfig.shouldDisable()) {
+            dynamicConfig.maybeFetchConfigs(currentTimeMs);

Review comment:
       Two consecutive calls to `dynamicConfig` this can be done in a method 
for `dynamicConfig` and simplify the API.

##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -103,12 +122,25 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, 
MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, 
MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, 
MEDIUM, RequestOverrideDoc)
+      .define(AcksOverrideProp, STRING, DefaultAcksOverride, 
ConfigDef.ValidString.in("all", "-1", "0", "1"), HIGH, AcksOverrideDoc)
+      .define(
+        SessionTimeoutOverrideProp, 
+        INT, 
+        DefaultSessionTimeoutOverride, 
+        ConfigDef.Range.between(Defaults.GroupMinSessionTimeoutMs, 
Defaults.GroupMaxSessionTimeoutMs), 
+        HIGH, 
+        SessionTimeoutOverrideDoc)
+      .define(HeartbeatIntervalOverrideProp, INT, 
DefaultHeartbeatIntervalOverride, HIGH, HeartbeatIntervalOverrideDoc)

Review comment:
       For all of the properties that you have added: didn't we decide to not 
validate the values here and to instead let the client do the validation?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicConsumerConfig.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.DynamicClientConfigUpdater;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * Handles the request and response of a dynamic client configuration update 
for the consumer
+ */
+public class DynamicConsumerConfig extends DynamicClientConfigUpdater {
+    /* Client to use */
+    private ConsumerNetworkClient client;
+
+    /* Configs to update */
+    private GroupRebalanceConfig rebalanceConfig;
+
+    /* Object to synchronize on when response is recieved */
+    Object lock;
+
+    /* Logger to use */
+    private Logger log;
+
+    /* The resource name to use when constructing a DescribeConfigsRequest */
+    private final String clientId;
+
+    /* Dynamic Configs recieved from the previous DescribeConfigsResponse */
+    private Map<String, String> previousDynamicConfigs;
+
+    /* Indicates if we have recieved the initial dynamic configurations */
+    private boolean initialConfigsFetched;
+
+    public DynamicConsumerConfig(ConsumerNetworkClient client, Object lock, 
GroupRebalanceConfig config, Time time, LogContext logContext) {
+        super(time);
+        this.rebalanceConfig = config;
+        this.log = logContext.logger(DynamicConsumerConfig.class);
+        this.client = client;
+        this.lock = lock;
+        this.clientId = rebalanceConfig.clientId;
+        this.previousDynamicConfigs = new HashMap<>();
+        this.initialConfigsFetched = false;
+    }
+    
+    /**
+     * Send a {@link DescribeConfigsRequest} to a node specifically for 
dynamic client configurations
+     *
+     * @return {@link RequestFuture} 
+     */ 
+    public RequestFuture<ClientResponse> maybeFetchInitialConfigs() {
+        if (!initialConfigsFetched) {
+            Node node = null;
+            while (node == null) {
+                node = client.leastLoadedNode();
+            }
+            log.info("Trying to fetch initial dynamic configs before join 
group request");
+            RequestFuture<ClientResponse> configsFuture = client.send(node, 
newRequestBuilder(this.clientId));
+            return configsFuture;
+        }
+        return null;
+    }
+
+    /**
+     * Block for a {@link DescribeConfigsResponse} and process it. Used to 
fetch the initial dynamic configurations synchronously before sending the 
initial
+     * {@link org.apache.kafka.common.requests.JoinGroupRequest}. Since this 
join RPC sends the group member's session timeout
+     * to the group coordinator, we should check if a dynamic configuration 
for session timeout is set before joining.
+     * If we do not do this initial fetch synchronously, then we could 
possibly trigger an unnecessary group rebalance operation by 
+     * sending a second join request after the dynamic configs are recieved 
asynchronously.
+     *
+     * @param responseFuture - future to block on
+     * @return true if responseFuture was blocked on and a response was 
recieved
+     */
+    public boolean maybeWaitForInitialConfigs(RequestFuture<ClientResponse> 
responseFuture) {
+        if (responseFuture != null) {
+            client.poll(responseFuture);
+            if (responseFuture.isDone()) {
+                DescribeConfigsResponse configsResponse = 
(DescribeConfigsResponse) responseFuture.value().responseBody();
+                handleSuccessfulResponse(configsResponse);
+                this.initialConfigsFetched = true;
+                return true;
+            }
+        }
+        return false;

Review comment:
       Why return a boolean? It looks like the client of this method never uses 
it.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -431,6 +433,26 @@ class AdminManager(val config: KafkaConfig,
                 (name, value) => new 
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+          case ConfigResource.Type.CLIENT =>
+            val clientId = resource.resourceName
+            val defaultProps = 
adminZkClient.fetchEntityConfig(ConfigType.Client, ConfigEntityName.Default)
+            val clientProps = 
adminZkClient.fetchEntityConfig(ConfigType.Client, if (clientId.isEmpty) 
ConfigEntityName.Default else clientId)
+            val overlayedProps = new Properties()
+            overlayedProps.putAll(defaultProps)
+            overlayedProps.putAll(clientProps)
+            val configMap = overlayedProps.stringPropertyNames.asScala
+              .filter(ClientConfigs.isClientConfig).map{key => (key -> 
overlayedProps.getProperty(key))}.toMap

Review comment:
       Java's `Properties` is a `HashTable`. This can be done by using 
`flatMap`. E.g.:
   
   ```scala
   overlayedProps.asScala.flatMap { (key, value) =>
     val key = key.toString
     val value = value.toString
   
     if (ClientConfigs.isClientConfig(key) {
       None
     } else {
       Some(key -> value)
     }
   }

##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -78,23 +78,42 @@ object DynamicConfig {
     def isQuotaConfig(name: String): Boolean = configNames.contains(name)
   }
 
+  object ClientConfigs {
+    val AcksOverrideProp = "acks"
+    val SessionTimeoutOverrideProp = "session.timeout.ms"
+    val HeartbeatIntervalOverrideProp = "heartbeat.interval.ms"

Review comment:
       Duplicate strings. These are specified somewhere else. Those symbols 
should be accessible from there.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -597,6 +619,15 @@ class AdminManager(val config: KafkaConfig,
             if (!validateOnly)
               alterLogLevelConfigs(alterConfigOps)
             resource -> ApiError.NONE
+          case ConfigResource.Type.CLIENT =>
+            val (configType, configKeys) = (ConfigType.Client, 
DynamicConfig.Client.configKeys)

Review comment:
       You are constructing a tuple and pattern matching against it. Not sure 
if the compiler optimizes this way. It looks these are all alias to known 
symbols. Do we need this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to