kirktrue commented on code in PR #13990: URL: https://github.com/apache/kafka/pull/13990#discussion_r1265887075
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java: ########## @@ -17,13 +17,175 @@ package org.apache.kafka.clients.consumer.internals; import java.io.Serializable; +import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; public final class Utils { + public static final String CONSUMER_JMX_PREFIX = "kafka.consumer"; + public static final String CONSUMER_METRIC_GROUP_PREFIX = "consumer"; + + /** + * A fixed, large enough value will suffice for max. + */ + public static final int CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION = 100; + + private static final String CONSUMER_CLIENT_ID_METRIC_TAG = "client-id"; + + public static ConsumerNetworkClient createConsumerNetworkClient(ConsumerConfig config, + Metrics metrics, + LogContext logContext, + ApiVersions apiVersions, + Time time, + Metadata metadata, + Sensor throttleTimeSensor, + long retryBackoffMs) { + NetworkClient netClient = ClientUtils.createNetworkClient(config, + metrics, + CONSUMER_METRIC_GROUP_PREFIX, + logContext, + apiVersions, + time, + CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION, + metadata, + throttleTimeSensor); + + // Will avoid blocking an extended period of time to prevent heartbeat thread starvation + int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); + + return new ConsumerNetworkClient( + logContext, + netClient, + metadata, + time, + retryBackoffMs, + config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), + heartbeatIntervalMs); + } + + public static LogContext createLogContext(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig) { + String groupId = String.valueOf(groupRebalanceConfig.groupId); Review Comment: This is an example of the original code from `KafkaConsumer`: ```java this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); LogContext logContext; // If group.instance.id is set, we will append it to the log context. if (groupRebalanceConfig.groupInstanceId.isPresent()) { logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + ", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } else { logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } ``` In the original code, the `groupRebalanceConfig.groupId ` String is converted to an `Optional<String>`. If the `String` was `null`, the `Optional<String>` will return `null`. In that way, the `String passed to the `LogContext` constructor will include `groupId=null`. Here's the new code, wrapped up in the `createLogContext` utility method: ```java public static LogContext createLogContext(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig) { String groupId = String.valueOf(groupRebalanceConfig.groupId); String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); String logPrefix; String groupInstanceId = groupRebalanceConfig.groupInstanceId.orElse(null); if (groupInstanceId != null) { // If group.instance.id is set, we will append it to the log context. logPrefix = String.format("[Consumer instanceId=%s, clientId=%s, groupId=%s] ", groupInstanceId, clientId, groupId); } else { logPrefix = String.format("[Consumer clientId=%s, groupId=%s] ", clientId, groupId); } return new LogContext(logPrefix); } ``` In the new code, we don't need to convert `groupRebalanceConfig.groupId` into an `Optional<String>`, we just leave it as a plain old `String`. If `groupRebalanceConfig.groupId` is `null`, the result of `logPrefix` will include `groupId=null`, as before. I guess we could simplify it even more: ```java public static LogContext createLogContext(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig) { String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); String logPrefix; String groupInstanceId = groupRebalanceConfig.groupInstanceId.orElse(null); if (groupInstanceId != null) { // If group.instance.id is set, we will append it to the log context. logPrefix = String.format("[Consumer instanceId=%s, clientId=%s, groupId=%s] ", groupInstanceId, clientId, groupRebalanceConfig.groupId); } else { logPrefix = String.format("[Consumer clientId=%s, groupId=%s] ", clientId, groupRebalanceConfig.groupId); } return new LogContext(logPrefix); } ``` That is, simply remove the first line of the utility method and pass `groupRebalanceConfig.groupId` to the `String.format` call. Internally, `format` will check for any `null` arguments, so the end result would still be `groupId=null`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org