Weichu Liu created KAFKA-9669:
---------------------------------

             Summary: Kafka 2.4.0 Chokes on Filebeat 5.6 Produced Data
                 Key: KAFKA-9669
                 URL: https://issues.apache.org/jira/browse/KAFKA-9669
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.4.0
            Reporter: Weichu Liu


Hi

In our environment, after upgrading to Kafka 2.4.0, we discovered the broker 
was not compatible with filebeat 5.

Here is how to reproduce:

1. Startup Kafka 2.4.0, all configurations are vanilla:

{code}
$ kafka_2.13-2.4.0/bin/zookeeper-server-start.sh 
kafka_2.13-2.4.0/config/zookeeper.properties
$ kafka_2.13-2.4.0/bin/kafka-server-start.sh 
kafka_2.13-2.4.0/config/server.properties
{code}

2. Startup filebeat 5.6.16 with the following configuration. (downloaded from 
https://www.elastic.co/jp/downloads/past-releases/filebeat-5-6-16)

{code}
$ cat /tmp/filebeat.yml
name: test

output.kafka:
  enabled: true
  hosts:
    - localhost:9092
  topic: test-3
  version: 0.10.0
  compression: gzip

filebeat:
  prospectors:
    - input_type: log
      paths:
        - /tmp/filebeat-in
      encoding: plain
{code}

{code}
$ filebeat-5.6.16-linux-x86_64/filebeat -e -c /tmp/filebeat.yml
{code}

3. Write some lines to file {{/tmp/filebeat-in}}. Looks like single line won't 
trigger the issue, but 30 lines are enough.

{code}
seq 30 >> /tmp/filebeat-in
{code}

4. Kafka throws the following error chunk, like, per produced record.

{noformat}
[2020-03-06 05:17:40,129] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition test-3-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: Inner record 
LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, 
crc=1453875406, CreateTime=1583471854475, key=0 bytes, value=202 bytes)) inside 
the compressed record batch does not have incremental offsets, expected offset 
is 1 in topic partition test-3-0.
[2020-03-06 05:17:40,129] ERROR [KafkaApi-0] Error when handling request: 
clientId=beats, correlationId=102, api=PRODUCE, version=2, 
body={acks=1,timeout=10000,partitionSizes=[test-3-0=272]} 
(kafka.server.KafkaApis)
java.lang.NullPointerException: `field` must be non-null
        at java.base/java.util.Objects.requireNonNull(Objects.java:246)
        at 
org.apache.kafka.common.protocol.types.Struct.validateField(Struct.java:474)
        at 
org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:418)
        at 
org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:436)
        at 
org.apache.kafka.common.requests.ProduceResponse.toStruct(ProduceResponse.java:281)
        at 
org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:35)
        at 
org.apache.kafka.common.requests.RequestContext.buildResponse(RequestContext.java:80)
        at kafka.server.KafkaApis.sendResponse(KafkaApis.scala:2892)
        at kafka.server.KafkaApis.sendResponseCallback$2(KafkaApis.scala:554)
        at 
kafka.server.KafkaApis.$anonfun$handleProduceRequest$11(KafkaApis.scala:576)
        at 
kafka.server.KafkaApis.$anonfun$handleProduceRequest$11$adapted(KafkaApis.scala:576)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:546)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:577)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:126)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
        at java.base/java.lang.Thread.run(Thread.java:835)
{noformat}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to