junrao commented on code in PR #13990:
URL: https://github.com/apache/kafka/pull/13990#discussion_r1265743507


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -102,7 +109,10 @@ public DefaultBackgroundThread(final Time time,
                                    final BlockingQueue<ApplicationEvent> 
applicationEventQueue,
                                    final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
                                    final ConsumerMetadata metadata,
-                                   final KafkaClient networkClient) {
+                                   final ApiVersions apiVersions,
+                                   final Metrics metrics,
+                                   final Sensor fetcherThrottleTimeSensor,
+                                   final SubscriptionState subscriptions) {

Review Comment:
   subscriptions seems unused?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java:
##########
@@ -17,13 +17,175 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 
 public final class Utils {
 
+    public static final String CONSUMER_JMX_PREFIX = "kafka.consumer";
+    public static final String CONSUMER_METRIC_GROUP_PREFIX = "consumer";
+
+    /**
+     * A fixed, large enough value will suffice for max.
+     */
+    public static final int CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION = 
100;
+
+    private static final String CONSUMER_CLIENT_ID_METRIC_TAG = "client-id";
+
+    public static ConsumerNetworkClient 
createConsumerNetworkClient(ConsumerConfig config,
+                                                                    Metrics 
metrics,
+                                                                    LogContext 
logContext,
+                                                                    
ApiVersions apiVersions,
+                                                                    Time time,
+                                                                    Metadata 
metadata,
+                                                                    Sensor 
throttleTimeSensor,
+                                                                    long 
retryBackoffMs) {
+        NetworkClient netClient = ClientUtils.createNetworkClient(config,
+                metrics,
+                CONSUMER_METRIC_GROUP_PREFIX,
+                logContext,
+                apiVersions,
+                time,
+                CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
+                metadata,
+                throttleTimeSensor);
+
+        // Will avoid blocking an extended period of time to prevent heartbeat 
thread starvation
+        int heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+
+        return new ConsumerNetworkClient(
+                logContext,
+                netClient,
+                metadata,
+                time,
+                retryBackoffMs,
+                config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                heartbeatIntervalMs);
+    }
+
+    public static LogContext createLogContext(ConsumerConfig config, 
GroupRebalanceConfig groupRebalanceConfig) {
+        String groupId = String.valueOf(groupRebalanceConfig.groupId);

Review Comment:
   The original code has the logic to handle `groupRebalanceConfig.groupId` 
being null. Should we keep that?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java:
##########
@@ -17,13 +17,175 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 
 public final class Utils {
 
+    public static final String CONSUMER_JMX_PREFIX = "kafka.consumer";
+    public static final String CONSUMER_METRIC_GROUP_PREFIX = "consumer";
+
+    /**
+     * A fixed, large enough value will suffice for max.
+     */
+    public static final int CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION = 
100;
+
+    private static final String CONSUMER_CLIENT_ID_METRIC_TAG = "client-id";
+
+    public static ConsumerNetworkClient 
createConsumerNetworkClient(ConsumerConfig config,
+                                                                    Metrics 
metrics,
+                                                                    LogContext 
logContext,
+                                                                    
ApiVersions apiVersions,
+                                                                    Time time,
+                                                                    Metadata 
metadata,
+                                                                    Sensor 
throttleTimeSensor,
+                                                                    long 
retryBackoffMs) {
+        NetworkClient netClient = ClientUtils.createNetworkClient(config,
+                metrics,
+                CONSUMER_METRIC_GROUP_PREFIX,
+                logContext,
+                apiVersions,
+                time,
+                CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
+                metadata,
+                throttleTimeSensor);
+
+        // Will avoid blocking an extended period of time to prevent heartbeat 
thread starvation
+        int heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+
+        return new ConsumerNetworkClient(
+                logContext,
+                netClient,
+                metadata,
+                time,
+                retryBackoffMs,
+                config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                heartbeatIntervalMs);
+    }
+
+    public static LogContext createLogContext(ConsumerConfig config, 
GroupRebalanceConfig groupRebalanceConfig) {
+        String groupId = String.valueOf(groupRebalanceConfig.groupId);
+        String clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+        String logPrefix;
+        String groupInstanceId = 
groupRebalanceConfig.groupInstanceId.orElse(null);
+
+        if (groupInstanceId != null) {
+            // If group.instance.id is set, we will append it to the log 
context.
+            logPrefix = String.format("[Consumer instanceId=%s, clientId=%s, 
groupId=%s] ", groupInstanceId, clientId, groupId);
+        } else {
+            logPrefix = String.format("[Consumer clientId=%s, groupId=%s] ", 
clientId, groupId);
+        }
+
+        return new LogContext(logPrefix);
+    }
+
+    public static IsolationLevel createIsolationLevel(ConsumerConfig config) {
+        String s = 
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT);
+        return IsolationLevel.valueOf(s);
+    }
+
+    public static SubscriptionState createSubscriptionState(ConsumerConfig 
config, LogContext logContext) {
+        String s = 
config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT);
+        OffsetResetStrategy strategy = OffsetResetStrategy.valueOf(s);
+        return new SubscriptionState(logContext, strategy);
+    }
+
+    public static Metrics createMetrics(ConsumerConfig config, Time time) {
+        String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+        Map<String, String> metricsTags = 
Collections.singletonMap(CONSUMER_CLIENT_ID_METRIC_TAG, clientId);
+        MetricConfig metricConfig = new MetricConfig()
+                
.samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
+                
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
+                
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+                .tags(metricsTags);
+        List<MetricsReporter> reporters = 
CommonClientConfigs.metricsReporters(clientId, config);
+        MetricsContext metricsContext = new 
KafkaMetricsContext(CONSUMER_JMX_PREFIX,
+                
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+        return new Metrics(metricConfig, reporters, time, metricsContext);
+    }
+
+    public static FetchMetricsManager createFetchMetricsManager(Metrics 
metrics) {
+        Set<String> metricsTags = 
Collections.singleton(CONSUMER_CLIENT_ID_METRIC_TAG);
+        FetchMetricsRegistry metricsRegistry = new 
FetchMetricsRegistry(metricsTags, CONSUMER_METRIC_GROUP_PREFIX);
+        return new FetchMetricsManager(metrics, metricsRegistry);
+    }
+
+    public static FetchConfig<String, String> createFetchConfig(ConsumerConfig 
config) {

Review Comment:
   This seems to be only imported, but not used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to