[
https://issues.apache.org/jira/browse/KAFKA-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Egerton resolved KAFKA-13632.
-----------------------------------
Resolution: Fixed
> MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered
> records
> --------------------------------------------------------------------------------
>
> Key: KAFKA-13632
> URL: https://issues.apache.org/jira/browse/KAFKA-13632
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Affects Versions: 3.1.0
> Reporter: Bert Baron
> Priority: Minor
>
> We have a setup where we filter records with MirrorMaker 2.0 (see below).
> This results in the following warning messages as a result of NPE's in
> MirrorSourceTask.commitRecord for each filtered record:
> {code:java}
> [2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure
> committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
> java.lang.NullPointerException
> at
> org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829) {code}
> The reason seems to be that for filtered records metadata is null. Note that
> in the overridden SourceTask.commitRecord the javadoc clearly states that
> metadata will be null if the record was filtered.
> In our case we use a custom predicate, but the issue can be reproduced with
> the following configuration:
> {code:java}
> clusters = source,target
> tasks.max = 1
> source.bootstrap.servers = <cluster1>
> target.bootstrap.servers = <cluster2>
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> source->target.enabled = true
> source->target.topics = topic1
> source->target.transforms=Filter
> source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
> source->target.transforms.Filter.predicate=HeaderPredicate
> source->target.predicates=HeaderPredicate
> source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
> source->target.predicates.HeaderPredicate.name=someheader
> {code}
> Each record with the header key 'someheader' will result in the NPE and
> warning message.
> On a side note, we couldn't find clear documentation on how to configure
> (SMT) filtering with MirrorMaker 2 or whether this is supported at all, but
> apart from the NPE's and warning messages this seems to functionally work for
> us with our custom filter.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)