[ https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893765#comment-17893765 ]
Daniel Urban commented on KAFKA-17719: -------------------------------------- I see that the fix for KAFKA-16838 was explicitly trying to solve the issue of tasks getting "resurrected" when a connector is created with the same name. Because of this, "removing" that edge case is not a proper solution, will dig deeper and try to figure out a solution. > Connect may fail to start tasks when reading from a compacted config topic > -------------------------------------------------------------------------- > > Key: KAFKA-17719 > URL: https://issues.apache.org/jira/browse/KAFKA-17719 > Project: Kafka > Issue Type: Bug > Components: connect > Reporter: Chaitanya Mukka > Assignee: Daniel Urban > Priority: Major > > The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) > alters the logic for materializing a view of the config topic to ignore task > configs when there is no configuration for that connector present earlier in > the config topic. However, the logic fails to consider topics that might get > compacted over time. > In particular, when we have a connector {{C1}} running fine, the records in > the config topic for the connector will look something like {{{}C1, T1, T2, > Task-commit-record{}}}. > If the connector gets a config update that doesn't produce any new task > configs (note that this is a valid case when there are no task config > changes[1]) we only produce a Connector config record [2]. The config topic > now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the > topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, > C1{}}}. This can be a common scenario in large and old connect clusters. > Based on the changes for KAFKA-16838, when the connect worker reads this > config state it ignores the task configs [3] for this while the connector is > still active and we might have active assignments for the same. The symptom > of this issue is an NPE which shows up when trying to start the tasks: > {noformat} > java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because > "inputMap" is null > at org.apache.kafka.common.utils.Utils.castToStringObjectMap > at org.apache.kafka.common.config.AbstractConfig.<init> > at org.apache.kafka.common.config.AbstractConfig.<init> > at org.apache.kafka.connect.runtime.TaskConfig.<init> > at org.apache.kafka.connect.runtime.Worker.startTask) > at org.apache.kafka.connect.runtime.Worker.startSourceTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41 > at java.base/java.util.concurrent.FutureTask.run > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run > at java.base/java.lang.Thread.run(Thread.java:1583){noformat} > > [1] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047] > > [2] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524] > [3] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074] -- This message was sent by Atlassian Jira (v8.20.10#820010)