Weichu Liu created KAFKA-9669:
---------------------------------
Summary: Kafka 2.4.0 Chokes on Filebeat 5.6 Produced Data
Key: KAFKA-9669
URL: https://issues.apache.org/jira/browse/KAFKA-9669
Project: Kafka
Issue Type: Bug
Affects Versions: 2.4.0
Reporter: Weichu Liu
Hi
In our environment, after upgrading to Kafka 2.4.0, we discovered the broker
was not compatible with filebeat 5.
Here is how to reproduce:
1. Startup Kafka 2.4.0, all configurations are vanilla:
{code}
$ kafka_2.13-2.4.0/bin/zookeeper-server-start.sh
kafka_2.13-2.4.0/config/zookeeper.properties
$ kafka_2.13-2.4.0/bin/kafka-server-start.sh
kafka_2.13-2.4.0/config/server.properties
{code}
2. Startup filebeat 5.6.16 with the following configuration. (downloaded from
https://www.elastic.co/jp/downloads/past-releases/filebeat-5-6-16)
{code}
$ cat /tmp/filebeat.yml
name: test
output.kafka:
enabled: true
hosts:
- localhost:9092
topic: test-3
version: 0.10.0
compression: gzip
filebeat:
prospectors:
- input_type: log
paths:
- /tmp/filebeat-in
encoding: plain
{code}
{code}
$ filebeat-5.6.16-linux-x86_64/filebeat -e -c /tmp/filebeat.yml
{code}
3. Write some lines to file {{/tmp/filebeat-in}}. Looks like single line won't
trigger the issue, but 30 lines are enough.
{code}
seq 30 >> /tmp/filebeat-in
{code}
4. Kafka throws the following error chunk, like, per produced record.
{noformat}
[2020-03-06 05:17:40,129] ERROR [ReplicaManager broker=0] Error processing
append operation on partition test-3-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: Inner record
LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE,
crc=1453875406, CreateTime=1583471854475, key=0 bytes, value=202 bytes)) inside
the compressed record batch does not have incremental offsets, expected offset
is 1 in topic partition test-3-0.
[2020-03-06 05:17:40,129] ERROR [KafkaApi-0] Error when handling request:
clientId=beats, correlationId=102, api=PRODUCE, version=2,
body={acks=1,timeout=10000,partitionSizes=[test-3-0=272]}
(kafka.server.KafkaApis)
java.lang.NullPointerException: `field` must be non-null
at java.base/java.util.Objects.requireNonNull(Objects.java:246)
at
org.apache.kafka.common.protocol.types.Struct.validateField(Struct.java:474)
at
org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:418)
at
org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:436)
at
org.apache.kafka.common.requests.ProduceResponse.toStruct(ProduceResponse.java:281)
at
org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:35)
at
org.apache.kafka.common.requests.RequestContext.buildResponse(RequestContext.java:80)
at kafka.server.KafkaApis.sendResponse(KafkaApis.scala:2892)
at kafka.server.KafkaApis.sendResponseCallback$2(KafkaApis.scala:554)
at
kafka.server.KafkaApis.$anonfun$handleProduceRequest$11(KafkaApis.scala:576)
at
kafka.server.KafkaApis.$anonfun$handleProduceRequest$11$adapted(KafkaApis.scala:576)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:546)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:577)
at kafka.server.KafkaApis.handle(KafkaApis.scala:126)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
at java.base/java.lang.Thread.run(Thread.java:835)
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)