AndrewJSchofield commented on code in PR #14560:
URL: https://github.com/apache/kafka/pull/14560#discussion_r1361262191


##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1660,6 +1662,34 @@ default FenceProducersResult 
fenceProducers(Collection<String> transactionalIds)
     FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                         FenceProducersOptions options);
 
+    /**
+     * Determines the client's unique client instance ID used for telemetry. 
This ID is unique to
+     * this specific client instance and will not change after it is initially 
generated.
+     * The ID is useful for correlating client operations with telemetry sent 
to the broker and
+     * to its eventual monitoring destination(s).
+     *
+     * If telemetry is enabled, this will first require a connection to the 
cluster to generate
+     * the unique client instance ID. This method waits up to {@code timeout} 
for the admin
+     * client to complete the request.
+     *
+     * If telemetry is disabled, the method will throw {@link 
IllegalStateException}.
+     *
+     * Client telemetry is controlled by the {@link 
AdminClientConfig#ENABLE_METRICS_PUSH_CONFIG}
+     * configuration option.
+     *
+     * @param timeout The maximum time to wait for admin client to determine 
its client instance ID.
+     *                The value should be non-negative. Specifying a timeout 
of zero means do not
+     *                wait for the initial request to complete if it hasn't 
already.
+     * @throws InterruptException If the thread is interrupted while blocked.
+     * @throws KafkaException If an unexpected error occurs while trying to 
determine the client
+     *                        instance ID, though this error does not 
necessarily imply the
+     *                        admin client is otherwise unusable.
+     * @throws IllegalArgumentException If the {@code timeout} is negative.
+     * @throws IllegalStateException If telemetry is not enabled.
+     * @return The client's assigned instance id used for metrics collection.
+     */
+    String clientInstanceId(Duration timeout);

Review Comment:
   The return type is Uuid in the final version of the KIP.



##########
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##########
@@ -105,6 +105,10 @@ public class CommonClientConfigs {
     public static final int RETRY_BACKOFF_EXP_BASE = 2;
     public static final double RETRY_BACKOFF_JITTER = 0.2;
 
+    public static final String ENABLE_METRICS_PUSH_CONFIG = 
"enable.metrics.push";
+    public static final String ENABLE_METRICS_PUSH_DOC = "Kafka client 
telemetry provides Kafka operators improved visibility over the behavior and 
internals of the clients that use the cluster." +

Review Comment:
   The KIP says "Whether to enable pushing of client metrics to the cluster, if 
the cluster has a client metrics subscription which matches this client." which 
sounds like a better description to me.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4385,6 +4385,11 @@ public FenceProducersResult 
fenceProducers(Collection<String> transactionalIds,
         return new FenceProducersResult(future.all());
     }
 
+    @Override
+    public String clientInstanceId(Duration timeout) {

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1892,6 +1892,37 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
     }
 
     /**
+     * Determines the client's unique client instance ID used for telemetry. 
This ID is unique to
+     * this specific client instance and will not change after it is initially 
generated.
+     * The ID is useful for correlating client operations with telemetry sent 
to the broker and
+     * to its eventual monitoring destination(s).
+     *
+     * If telemetry is enabled, this will first require a connection to the 
cluster to generate
+     * the unique client instance ID. This method waits up to {@code timeout} 
for the consumer
+     * client to complete the request.
+     *
+     * If telemetry is disabled, the method will throw {@link 
IllegalStateException}.
+     *
+     * Client telemetry is controlled by the {@link 
ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
+     * configuration option.
+     *
+     * @param timeout The maximum time to wait for consumer client to 
determine its client instance ID.
+     *                The value should be non-negative. Specifying a timeout 
of zero means do not
+     *                wait for the initial request to complete if it hasn't 
already.
+     * @throws InterruptException If the thread is interrupted while blocked.
+     * @throws KafkaException If an unexpected error occurs while trying to 
determine the client
+     *                        instance ID, though this error does not 
necessarily imply the
+     *                        consumer client is otherwise unusable.
+     * @throws IllegalArgumentException If the {@code timeout} is negative.
+     * @throws IllegalStateException If telemetry is not enabled.
+     * @return The client's assigned instance id used for metrics collection.
+     */
+    @Override
+    public String clientInstanceId(Duration timeout) {

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -384,6 +384,11 @@ public synchronized void updateEndOffsets(final 
Map<TopicPartition, Long> newOff
         endOffsets.putAll(newOffsets);
     }
 
+    @Override
+    public String clientInstanceId(Duration timeout) {

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1252,6 +1252,37 @@ public List<PartitionInfo> partitionsFor(String topic) {
         return Collections.unmodifiableMap(this.metrics.metrics());
     }
 
+    /**
+     * Determines the client's unique client instance ID used for telemetry. 
This ID is unique to
+     * this specific client instance and will not change after it is initially 
generated.
+     * The ID is useful for correlating client operations with telemetry sent 
to the broker and
+     * to its eventual monitoring destination(s).
+     *
+     * If telemetry is enabled, this will first require a connection to the 
cluster to generate
+     * the unique client instance ID. This method waits up to {@code timeout} 
for the producer
+     * client to complete the request.
+     *
+     * If telemetry is disabled, the method will throw {@link 
IllegalStateException}.
+     *
+     * Client telemetry is controlled by the {@link 
ProducerConfig#ENABLE_METRICS_PUSH_CONFIG}
+     * configuration option.
+     *
+     * @param timeout The maximum time to wait for producer client to 
determine its client instance ID.
+     *                The value should be non-negative. Specifying a timeout 
of zero means do not
+     *                wait for the initial request to complete if it hasn't 
already.
+     * @throws InterruptException If the thread is interrupted while blocked.
+     * @throws KafkaException If an unexpected error occurs while trying to 
determine the client
+     *                        instance ID, though this error does not 
necessarily imply the
+     *                        producer client is otherwise unusable.
+     * @throws IllegalArgumentException If the {@code timeout} is negative.
+     * @throws IllegalStateException If telemetry is not enabled.
+     * @return The client's assigned instance id used for metrics collection.
+     */
+    @Override
+    public String clientInstanceId(Duration timeout) {

Review Comment:
   Uuid



##########
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##########
@@ -1312,6 +1312,11 @@ synchronized public void setMockMetrics(MetricName name, 
Metric metric) {
         mockMetrics.put(name, metric);
     }
 
+    @Override
+    public String clientInstanceId(Duration timeout) {

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1892,6 +1892,37 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
     }
 
     /**
+     * Determines the client's unique client instance ID used for telemetry. 
This ID is unique to
+     * this specific client instance and will not change after it is initially 
generated.
+     * The ID is useful for correlating client operations with telemetry sent 
to the broker and
+     * to its eventual monitoring destination(s).

Review Comment:
   There's no need for the "(s)" in the comment. It's already entirely readable.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -173,6 +173,11 @@ public interface Consumer<K, V> extends Closeable {
      */
     Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> 
partitions, final Duration timeout);
 
+    /**
+     * See {@link KafkaConsumer#clientInstanceId(Duration)}}
+     */
+    String clientInstanceId(Duration timeout);

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java:
##########
@@ -277,6 +277,11 @@ public FenceProducersResult 
fenceProducers(Collection<String> transactionalIds,
         return delegate.fenceProducers(transactionalIds, options);
     }
 
+    @Override
+    public String clientInstanceId(Duration timeout) {

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -590,6 +590,11 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         }
     }
 
+    @Override
+    public String clientInstanceId(Duration timeout) {

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1252,6 +1252,37 @@ public List<PartitionInfo> partitionsFor(String topic) {
         return Collections.unmodifiableMap(this.metrics.metrics());
     }
 
+    /**
+     * Determines the client's unique client instance ID used for telemetry. 
This ID is unique to
+     * this specific client instance and will not change after it is initially 
generated.
+     * The ID is useful for correlating client operations with telemetry sent 
to the broker and
+     * to its eventual monitoring destination(s).

Review Comment:
   (s)



##########
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java:
##########
@@ -379,6 +379,11 @@ public List<PartitionInfo> partitionsFor(String topic) {
         return this.cluster.partitionsForTopic(topic);
     }
 
+    @Override
+    public String clientInstanceId(Duration timeout) {

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/producer/Producer.java:
##########
@@ -95,6 +95,11 @@ void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offsets,
      */
     Map<MetricName, ? extends Metric> metrics();
 
+    /**
+     * See {@link KafkaProducer#clientInstanceId(Duration)}}
+     */
+    String clientInstanceId(Duration timeout);

Review Comment:
   Uuid



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -385,6 +391,10 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0L),
                                         Importance.LOW,
                                         
CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC)
+                                .define(ENABLE_METRICS_PUSH_CONFIG,
+                                        Type.BOOLEAN, true,

Review Comment:
   Formatting. Please place the type and the default on separate lines like the 
surrounding text.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to