junrao commented on code in PR #10826:
URL: https://github.com/apache/kafka/pull/10826#discussion_r849786048


##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -523,6 +532,45 @@ boolean idempotenceEnabled() {
         return userConfiguredTransactions || idempotenceEnabled;
     }
 
+    public CompressionConfig getCompressionConfig(CompressionType 
compressionType) {

Review Comment:
   This is a public class. We probably don't want to expose this as a public 
method since we don't expose other config values.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.compress.GzipConfig;
+import org.apache.kafka.common.compress.LZ4Config;
+import org.apache.kafka.common.compress.NoneConfig;
+import org.apache.kafka.common.compress.SnappyConfig;
+import org.apache.kafka.common.compress.ZstdConfig;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public abstract class CompressionConfig {
+
+    public final static NoneConfig NONE = none().build();
+
+    // Instantiating this class directly is disallowed; use builder methods.
+    protected CompressionConfig() {}
+
+    public abstract CompressionType getType();

Review Comment:
   We typically don't use getter. So, this can just by type().



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -104,7 +104,7 @@ public final class RecordAccumulator {
      */
     public RecordAccumulator(LogContext logContext,
                              int batchSize,
-                             CompressionType compression,
+                             CompressionConfig compression,

Review Comment:
   Could we adjust the corresponding javadoc?



##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -312,6 +320,7 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.LOW,
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
+                                .define(COMPRESSION_LEVEL_CONFIG, Type.STRING, 
"", Importance.MEDIUM, COMPRESSION_LEVEL_DOC)

Review Comment:
   The KIP says the default value is null.



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.compress.GzipConfig;
+import org.apache.kafka.common.compress.LZ4Config;
+import org.apache.kafka.common.compress.NoneConfig;
+import org.apache.kafka.common.compress.SnappyConfig;
+import org.apache.kafka.common.compress.ZstdConfig;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public abstract class CompressionConfig {
+
+    public final static NoneConfig NONE = none().build();
+
+    // Instantiating this class directly is disallowed; use builder methods.
+    protected CompressionConfig() {}
+
+    public abstract CompressionType getType();
+
+    /**
+     * Wrap bufferStream with an OutputStream that will compress data with 
this CompressionType.
+     *
+     * Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take 
{@link ByteBuffer}s directly.
+     * Currently, {@link MemoryRecordsBuilder#writeDefaultBatchHeader()} and 
{@link MemoryRecordsBuilder#writeLegacyCompressedWrapperHeader()}
+     * write to the underlying buffer in the given {@link 
ByteBufferOutputStream} after the compressed data has been written.
+     * In the event that the buffer needs to be expanded while writing the 
data, access to the underlying buffer needs to be preserved.
+     */
+    public abstract OutputStream wrapForOutput(ByteBufferOutputStream 
bufferStream, byte messageVersion);
+
+    /**
+     * Wrap buffer with an InputStream that will decompress data with this 
CompressionType.
+     *
+     * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used 
for decompression if supported.

Review Comment:
   Could we add the rest of the params to javadoc?



##########
clients/src/main/java/org/apache/kafka/common/compress/GzipOutputStream.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * An extension of {@link GZIPOutputStream}, with compression level 
functionality.
+ */
+public class GzipOutputStream extends GZIPOutputStream {
+    /**
+     * Creates a new {@link OutputStream} with the specified buffer size and 
compression level.
+     *
+     * @param out   the output stream
+     * @param size  the output buffer size
+     * @param level the compression level
+     * @throws IOException If an I/O error has occurred.
+     */
+    public GzipOutputStream(OutputStream out, int size, int level) throws 
IOException {
+        super(out, size);

Review Comment:
   Could we just directly use the constructor with level and avoid this wrapper 
class?



##########
clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java:
##########
@@ -122,7 +119,11 @@ public KafkaLZ4BlockOutputStream(OutputStream out) throws 
IOException {
     }
 
     public KafkaLZ4BlockOutputStream(OutputStream out, boolean useBrokenHC) 
throws IOException {

Review Comment:
   This is an existing issue. But this constructor and the one above seem 
unused and could be removed.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -31,6 +31,9 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.record.CompressionConfig;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;

Review Comment:
   Do we need to move those imports?



##########
core/src/main/scala/kafka/log/LogConfig.scala:
##########
@@ -177,6 +179,44 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
     }
     ConfigUtils.configMapToRedactedString(overriddenTopicProps.asJava, 
configDef)
   }
+
+  /**
+   * Returns per-codec [[CompressionConfig]] for given `compressionType`.
+   */
+  private def getCompressionConfig(compressionType: CompressionType): 
CompressionConfig = {

Review Comment:
   Another thing is that if we set a topic or broker level compression type to 
be the same as the producer, but with a different compression level, whether we 
should trigger broker side recompression. Currently, we don't do recompression. 
Should we do it? If so, is it possible to know the producer compression level?



##########
clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;

Review Comment:
   Hmm, it seems this class should be in common.compress package instead of 
here?



##########
core/src/main/scala/kafka/log/LogConfig.scala:
##########
@@ -177,6 +179,44 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
     }
     ConfigUtils.configMapToRedactedString(overriddenTopicProps.asJava, 
configDef)
   }
+
+  /**
+   * Returns per-codec [[CompressionConfig]] for given `compressionType`.
+   */
+  private def getCompressionConfig(compressionType: CompressionType): 
CompressionConfig = {
+    if (this.compressionLevel.isEmpty) {
+      compressionType match {
+        case CompressionType.NONE => CompressionConfig.NONE
+        case CompressionType.GZIP => CompressionConfig.gzip.build

Review Comment:
   For methods with side effect like build, we want to include (). So, build() 
instead of build.



##########
core/src/main/scala/kafka/log/LogConfig.scala:
##########
@@ -96,6 +97,7 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val uncleanLeaderElectionEnable = 
getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
   val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
   val compressionType = 
getString(LogConfig.CompressionTypeProp).toLowerCase(Locale.ROOT)
+  val compressionLevel = getString(LogConfig.CompressionLevelProp)

Review Comment:
   With topic level overriding, things can be a bit tricky. Suppose that a user 
sets up a broker level compression type and compression level, but only 
overrides the compression type for a topic. With this logic, the topic will 
pick up the broker level compression level, which could be intended for a 
different compression type.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -59,7 +59,7 @@ class GroupMetadataManager(brokerId: Int,
                            time: Time,
                            metrics: Metrics) extends Logging with 
KafkaMetricsGroup {
 
-  private val compressionType: CompressionType = 
CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
+  private val compressionConfig: CompressionConfig = 
CompressionConfig.of(CompressionType.forId(config.offsetsTopicCompressionCodec.codec)).build

Review Comment:
   The KIP doesn't mention this. For offset topic, should we stick with the 
default compression level or introduce a separate config for compression level?



##########
raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java:
##########
@@ -113,12 +113,11 @@ private void finalizeSnapshotWithFooter() {
      * Create an instance of this class and initialize
      * the underlying snapshot with {@link SnapshotHeaderRecord}
      *
-     * @param snapshot a lambda to create the low level snapshot writer

Review Comment:
   This is an existing issue. Could we add the missing javadoc for supplier?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1131,19 +1130,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       shallowMessageCount += 1
       validBytesCount += batchSize
 
-      val messageCodec = 
CompressionCodec.getCompressionCodec(batch.compressionType.id)
-      if (messageCodec != NoCompressionCodec)
-        sourceCodec = messageCodec
+      val messageCompressionType = batch.compressionType
+      if (messageCompressionType != CompressionType.NONE)
+        sourceType = messageCompressionType
     }
 
     // Apply broker-side compression if any
-    val targetCodec = 
BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, 
sourceCodec)
+    val targetConfig = config.compressionConfig(sourceType)

Review Comment:
   Hmm, I am not sure about this change. The old logic handles the case when 
the topic level compression type is producer. In that case, the producer codec 
will be used. The new code doesn't seem to handle that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to