[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715218#comment-16715218
 ] 

ASF GitHub Bot commented on KAFKA-7549:
---------------------------------------

hachikuji closed pull request #5925: KAFKA-7549: Old ProduceRequest with zstd 
compression does not return error to client
URL: https://github.com/apache/kafka/pull/5925
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index f87090eba6a..9f9de42c866 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.CommonFields;
 import org.apache.kafka.common.protocol.Errors;
@@ -172,6 +173,21 @@ public Builder(short minVersion,
 
         @Override
         public ProduceRequest build(short version) {
+            return build(version, true);
+        }
+
+        // Visible for testing only
+        public ProduceRequest buildUnsafe(short version) {
+            return build(version, false);
+        }
+
+        private ProduceRequest build(short version, boolean validate) {
+            if (validate) {
+                // Validate the given records first
+                for (MemoryRecords records : partitionRecords.values()) {
+                    ProduceRequest.validateRecords(version, records);
+                }
+            }
             return new ProduceRequest(version, acks, timeout, 
partitionRecords, transactionalId);
         }
 
@@ -210,8 +226,9 @@ private ProduceRequest(short version, short acks, int 
timeout, Map<TopicPartitio
         this.partitionRecords = partitionRecords;
         this.partitionSizes = createPartitionSizes(partitionRecords);
 
-        for (MemoryRecords records : partitionRecords.values())
-            validateRecords(version, records);
+        for (MemoryRecords records : partitionRecords.values()) {
+            setFlags(records);
+        }
     }
 
     private static Map<TopicPartition, Integer> 
createPartitionSizes(Map<TopicPartition, MemoryRecords> partitionRecords) {
@@ -231,7 +248,7 @@ public ProduceRequest(Struct struct, short version) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.get(PARTITION_ID);
                 MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                validateRecords(version, records);
+                setFlags(records);
                 partitionRecords.put(new TopicPartition(topic, partition), 
records);
             }
         }
@@ -241,32 +258,11 @@ public ProduceRequest(Struct struct, short version) {
         transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
     }
 
-    private void validateRecords(short version, MemoryRecords records) {
-        if (version >= 3) {
-            Iterator<MutableRecordBatch> iterator = 
records.batches().iterator();
-            if (!iterator.hasNext())
-                throw new InvalidRecordException("Produce requests with 
version " + version + " must have at least " +
-                        "one record batch");
-
-            MutableRecordBatch entry = iterator.next();
-            if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
-                throw new InvalidRecordException("Produce requests with 
version " + version + " are only allowed to " +
-                        "contain record batches with magic version 2");
-            if (version < 7 && entry.compressionType() == 
CompressionType.ZSTD) {
-                throw new InvalidRecordException("Produce requests with 
version " + version + " are note allowed to " +
-                    "use ZStandard compression");
-            }
-
-            if (iterator.hasNext())
-                throw new InvalidRecordException("Produce requests with 
version " + version + " are only allowed to " +
-                        "contain exactly one record batch");
-            idempotent = entry.hasProducerId();
-            transactional = entry.isTransactional();
-        }
-
-        // Note that we do not do similar validation for older versions to 
ensure compatibility with
-        // clients which send the wrong magic version in the wrong version of 
the produce request. The broker
-        // did not do this validation before, so we maintain that behavior 
here.
+    private void setFlags(MemoryRecords records) {
+        Iterator<MutableRecordBatch> iterator = records.batches().iterator();
+        MutableRecordBatch entry = iterator.next();
+        idempotent = entry.hasProducerId();
+        transactional = entry.isTransactional();
     }
 
     /**
@@ -394,6 +390,32 @@ public void clearPartitionRecords() {
         partitionRecords = null;
     }
 
+    public static void validateRecords(short version, MemoryRecords records) {
+        if (version >= 3) {
+            Iterator<MutableRecordBatch> iterator = 
records.batches().iterator();
+            if (!iterator.hasNext())
+                throw new InvalidRecordException("Produce requests with 
version " + version + " must have at least " +
+                    "one record batch");
+
+            MutableRecordBatch entry = iterator.next();
+            if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
+                throw new InvalidRecordException("Produce requests with 
version " + version + " are only allowed to " +
+                    "contain record batches with magic version 2");
+            if (version < 7 && entry.compressionType() == 
CompressionType.ZSTD) {
+                throw new UnsupportedCompressionTypeException("Produce 
requests with version " + version + " are note allowed to " +
+                    "use ZStandard compression");
+            }
+
+            if (iterator.hasNext())
+                throw new InvalidRecordException("Produce requests with 
version " + version + " are only allowed to " +
+                    "contain exactly one record batch");
+        }
+
+        // Note that we do not do similar validation for older versions to 
ensure compatibility with
+        // clients which send the wrong magic version in the wrong version of 
the produce request. The broker
+        // did not do this validation before, so we maintain that behavior 
here.
+    }
+
     public static ProduceRequest parse(ByteBuffer buffer, short version) {
         return new ProduceRequest(ApiKeys.PRODUCE.parseRequest(version, 
buffer), version);
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f2e3e01af83..1d786bbdf71 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -46,7 +46,7 @@ import 
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, 
EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, 
MultiRecordsSend, RecordBatch, RecordConversionStats, Records}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, 
AclFilterResponse}
@@ -391,6 +391,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val nonExistingTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
+    val invalidRequestResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
 
     for ((topicPartition, memoryRecords) <- 
produceRequest.partitionRecordsOrFail.asScala) {
@@ -399,12 +400,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       else if (!metadataCache.contains(topicPartition))
         nonExistingTopicResponses += topicPartition -> new 
PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
       else
-        authorizedRequestInfo += (topicPartition -> memoryRecords)
+        try {
+          ProduceRequest.validateRecords(request.header.apiVersion(), 
memoryRecords)
+          authorizedRequestInfo += (topicPartition -> memoryRecords)
+        } catch {
+          case e: ApiException =>
+            invalidRequestResponses += topicPartition -> new 
PartitionResponse(Errors.forException(e))
+        }
     }
 
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]) {
-      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses 
++ nonExistingTopicResponses
+      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses 
++ nonExistingTopicResponses ++ invalidRequestResponses
       var errorInResponse = false
 
       mergedResponseStatus.foreach { case (topicPartition, status) =>
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index b1f3af145b9..906de71ecc6 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -133,11 +133,19 @@ class ProduceRequestTest extends BaseRequestTest {
     // produce request with v7: works fine!
     val res1 = sendProduceRequest(leader,
       new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, 
null).build())
-    val (tp, partitionResponse) = res1.responses.asScala.head
-    assertEquals(topicPartition, tp)
-    assertEquals(Errors.NONE, partitionResponse.error)
-    assertEquals(0, partitionResponse.baseOffset)
-    assertEquals(-1, partitionResponse.logAppendTime)
+    val (tp1, partitionResponse1) = res1.responses.asScala.head
+    assertEquals(topicPartition, tp1)
+    assertEquals(Errors.NONE, partitionResponse1.error)
+    assertEquals(0, partitionResponse1.baseOffset)
+    assertEquals(-1, partitionResponse1.logAppendTime)
+
+    // produce request with v3: returns Errors.UNSUPPORTED_COMPRESSION_TYPE.
+    val res2 = sendProduceRequest(leader,
+      new ProduceRequest.Builder(3, 3, -1, 3000, partitionRecords.asJava, null)
+        .buildUnsafe(3))
+    val (tp2, partitionResponse2) = res2.responses.asScala.head
+    assertEquals(topicPartition, tp2)
+    assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, partitionResponse2.error)
   }
 
   private def sendProduceRequest(leaderId: Int, request: ProduceRequest): 
ProduceResponse = {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old ProduceRequest with zstd compression does not return error to client
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-7549
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7549
>             Project: Kafka
>          Issue Type: Bug
>          Components: compression
>            Reporter: Magnus Edenhill
>            Assignee: Lee Dongjin
>            Priority: Major
>             Fix For: 2.2.0, 2.1.1
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to