apoorvmittal10 commented on code in PR #20144:
URL: https://github.com/apache/kafka/pull/20144#discussion_r2257849303


##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##########
@@ -713,14 +716,20 @@ private Optional<Builder<?>> 
createPushRequest(ClientTelemetrySubscription local
                 return Optional.empty();
             }
 
-            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
+            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(),
 unsupportedCompressionTypes);
             ByteBuffer compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
-            } catch (Throwable e) {
-                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
-                compressedPayload = ByteBuffer.wrap(payload.toByteArray());
-                compressionType = CompressionType.NONE;
+            } catch (Throwable e) { 
+                if (e instanceof IOException || e instanceof 
NoClassDefFoundError || 

Review Comment:
   Are you saying IOException will never be wrapped?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##########
@@ -379,14 +382,25 @@ public Optional<AbstractRequest.Builder<?>> 
createRequest() {
             } finally {
                 lock.readLock().unlock();
             }
-
-            if (localState == ClientTelemetryState.SUBSCRIPTION_NEEDED) {
-                return createSubscriptionRequest(localSubscription);
-            } else if (localState == ClientTelemetryState.PUSH_NEEDED || 
localState == ClientTelemetryState.TERMINATING_PUSH_NEEDED) {
-                return createPushRequest(localSubscription);
+            
+            try {
+                if (localState == ClientTelemetryState.SUBSCRIPTION_NEEDED) {
+                    return createSubscriptionRequest(localSubscription);
+                } else if (localState == ClientTelemetryState.PUSH_NEEDED || 
localState == ClientTelemetryState.TERMINATING_PUSH_NEEDED) {
+                    return createPushRequest(localSubscription);
+                }
+                log.warn("Cannot make telemetry request as telemetry is in 
state: {}", localState);
+            } catch (RuntimeException e) {
+                // Transition to TERMINATED state to prevent further attempts
+                lock.writeLock().lock();
+                try {
+                    state = ClientTelemetryState.TERMINATED;
+                } finally {
+                    lock.writeLock().unlock();
+                }
+                log.error("Fatal telemetry error encountered, disabling 
telemetry permanently: {}", e.getMessage());

Review Comment:
   Why do we need this change?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to