Weichu Liu created KAFKA-9669: --------------------------------- Summary: Kafka 2.4.0 Chokes on Filebeat 5.6 Produced Data Key: KAFKA-9669 URL: https://issues.apache.org/jira/browse/KAFKA-9669 Project: Kafka Issue Type: Bug Affects Versions: 2.4.0 Reporter: Weichu Liu
Hi In our environment, after upgrading to Kafka 2.4.0, we discovered the broker was not compatible with filebeat 5. Here is how to reproduce: 1. Startup Kafka 2.4.0, all configurations are vanilla: {code} $ kafka_2.13-2.4.0/bin/zookeeper-server-start.sh kafka_2.13-2.4.0/config/zookeeper.properties $ kafka_2.13-2.4.0/bin/kafka-server-start.sh kafka_2.13-2.4.0/config/server.properties {code} 2. Startup filebeat 5.6.16 with the following configuration. (downloaded from https://www.elastic.co/jp/downloads/past-releases/filebeat-5-6-16) {code} $ cat /tmp/filebeat.yml name: test output.kafka: enabled: true hosts: - localhost:9092 topic: test-3 version: 0.10.0 compression: gzip filebeat: prospectors: - input_type: log paths: - /tmp/filebeat-in encoding: plain {code} {code} $ filebeat-5.6.16-linux-x86_64/filebeat -e -c /tmp/filebeat.yml {code} 3. Write some lines to file {{/tmp/filebeat-in}}. Looks like single line won't trigger the issue, but 30 lines are enough. {code} seq 30 >> /tmp/filebeat-in {code} 4. Kafka throws the following error chunk, like, per produced record. {noformat} [2020-03-06 05:17:40,129] ERROR [ReplicaManager broker=0] Error processing append operation on partition test-3-0 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: Inner record LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, crc=1453875406, CreateTime=1583471854475, key=0 bytes, value=202 bytes)) inside the compressed record batch does not have incremental offsets, expected offset is 1 in topic partition test-3-0. [2020-03-06 05:17:40,129] ERROR [KafkaApi-0] Error when handling request: clientId=beats, correlationId=102, api=PRODUCE, version=2, body={acks=1,timeout=10000,partitionSizes=[test-3-0=272]} (kafka.server.KafkaApis) java.lang.NullPointerException: `field` must be non-null at java.base/java.util.Objects.requireNonNull(Objects.java:246) at org.apache.kafka.common.protocol.types.Struct.validateField(Struct.java:474) at org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:418) at org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:436) at org.apache.kafka.common.requests.ProduceResponse.toStruct(ProduceResponse.java:281) at org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:35) at org.apache.kafka.common.requests.RequestContext.buildResponse(RequestContext.java:80) at kafka.server.KafkaApis.sendResponse(KafkaApis.scala:2892) at kafka.server.KafkaApis.sendResponseCallback$2(KafkaApis.scala:554) at kafka.server.KafkaApis.$anonfun$handleProduceRequest$11(KafkaApis.scala:576) at kafka.server.KafkaApis.$anonfun$handleProduceRequest$11$adapted(KafkaApis.scala:576) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:546) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:577) at kafka.server.KafkaApis.handle(KafkaApis.scala:126) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) at java.base/java.lang.Thread.run(Thread.java:835) {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)