chia7712 commented on code in PR #18726:
URL: https://github.com/apache/kafka/pull/18726#discussion_r1972645207


##########
clients/src/main/resources/common/message/FetchResponse.json:
##########
@@ -106,7 +106,7 @@
         ]},
         { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", 
"default": "-1", "ignorable": false, "entityType": "brokerId",
           "about": "The preferred read replica for the consumer to use on its 
next fetch request."},
-        { "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}

Review Comment:
   > It seems that before 
https://github.com/apache/kafka/commit/fe56fc98fa736c79c9dcbb1f64f810065161a1cc,
 we already have a bunch of calls like the following that will leave records as 
null in the fetchResponse.
   
   in the normal path, `FetchResponse.recordsOrFail(partitionData)` will return 
empty record to replace null.
   ```
       public static Records recordsOrFail(FetchResponseData.PartitionData 
partition) {
           if (partition.records() == null) return MemoryRecords.EMPTY;
           if (partition.records() instanceof Records) return (Records) 
partition.records();
           throw new ClassCastException("The record type is " + 
partition.records().getClass().getSimpleName() + ", which is not a subtype of " 
+
               Records.class.getSimpleName() + ". This method is only safe to 
call if the `FetchResponse` was deserialized from bytes.");
       }
   ```
   
   
https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L848
   
   However, your comment inspired me to notice that we do have a path which 
returns null records. See the following links – it could return 
`FetchResponse.partitionResponse(tp, Errors.XXX)` when converting the records.
   
   
https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L835
   
https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L884
   
   ```
           val downConvertMagic =
             logConfig.map(_.recordVersion.value).flatMap { magic =>
               if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1)
                 Some(RecordBatch.MAGIC_VALUE_V0)
               else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3)
                 Some(RecordBatch.MAGIC_VALUE_V1)
               else
                 None
             }
   ```
   However, a 4.0 consumer should never receive a null record because it cannot 
use fetch versions v0-v3. Conversely, the server rejects v0-v3 requests, and 
the down-conversion process is also removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to