k-raina commented on code in PR #20144:
URL: https://github.com/apache/kafka/pull/20144#discussion_r2254377927


##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##########
@@ -713,14 +716,20 @@ private Optional<Builder<?>> 
createPushRequest(ClientTelemetrySubscription local
                 return Optional.empty();
             }
 
-            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
+            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(),
 unsupportedCompressionTypes);
             ByteBuffer compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
-            } catch (Throwable e) {
-                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
-                compressedPayload = ByteBuffer.wrap(payload.toByteArray());
-                compressionType = CompressionType.NONE;
+            } catch (Throwable e) { 
+                if (e instanceof IOException || e instanceof 
NoClassDefFoundError || 

Review Comment:
   For ZSTD:
   ```
   ClientTelemetryUtils.compress() [Line 206]
   └── Compression.of(ZSTD).build()
       └── ZstdCompression.Builder.build() [Line 138]
           └── new ZstdCompression(level)
                   └── throws NoClassDefFoundError: 
com/github/luben/zstd/BufferPool
                       ❌ Exception escapes - no try-catch at this level
    ```
    ```
   // ZstdCompression.java - External libraries imported at the top
   import com.github.luben.zstd.BufferPool;           // ← These imports
   import com.github.luben.zstd.RecyclingBufferPool;  // ← cause problems
   import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
   import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
   
   ```
   When Java tries to load the ZstdCompression class, it immediately tries to 
find all the imported classes. If the ZSTD library is missing, Java throws 
NoClassDefFoundError right away - before Kafka can catch it.  Similarly for 
Snappy class also. 
   
   
   For LZ4: 
   ```
   ClientTelemetryUtils.compress() [Line 207]
   └── Compression.of(LZ4).build()
       └── Lz4Compression.Builder.build() [Line 138] ✅ No lz4 native library
   └── compression.wrapForOutput()
       └── Lz4Compression.wrapForOutput() [Line 49]
           └── try {
               └── new Lz4BlockOutputStream(...)
                   └── NoClassDefFoundError: net/jpountz/lz4/LZ4Factory
           └── } catch (Throwable e) {
               └── throw new KafkaException(e) ❌ Exception wrapped
           └── }
   ``` 
   
   ```
   // Lz4Compression.java - NO external library imports at the top
   import org.apache.kafka.common.KafkaException;
   import org.apache.kafka.common.record.CompressionType;
   // ... only Kafka's own classes imported
   ```
   The Lz4Compression class loads fine. External LZ4 classes are only accessed 
later when actually needed during compression.wrapForOutput call.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to