[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179220#comment-17179220 ]
Ryanne Dolan commented on KAFKA-10370: -------------------------------------- Thanks [~yangguo1220]. I believe that using assign() instead of subscribe() will have unexpected side effects, e.g. no rebalances and no auto-detection of new topics/partitions. As you know, in MirrorSourceTask, we explicitly avoided using subscribe() and instead handle rebalances and new topic-partitions explicitly, for efficiency purposes. (Rebalances were a major problem in production deployments of MM1.) But I'm not sure it would be appropriate to change the behavior of WorkerSinkTask here, especially as a side-effect of calling SinkTaskContext.offset(). IIRC, the root cause of the exception is actually that subscribe() results in partitions being assigned asynchronously, so if you subscribe() and then seek() you'll likely have zero assignments at that point. I believe the correct way to deal with this is to register a RebalanceListener, which can rewind the offsets of a partition _after_ the partition is assigned. It may be possible for WorkerSinkTask to do this automatically. There are basically two scenarios: - SinkTaskContext.offset() is called _after_ a partition is assigned, in which case the existing implementation should work. Unfortunately, it seems impossible for a SinkTask to know whether the partition is assigned or not. This to me seems like a bug in the API. - SinkTaskContext.offset() is called _before_ a partition is assigned, which would result in the exception you're seeing. In this case, WorkerSinkTask could store the offsets and seek() asynchronously using a RebalanceListener. This essentially defers the seek() until _after_ the partition is actually assigned, thus avoiding the exception. It's possible this bug only exists in the edge-case of calling WorkerSinkTask.offsets() within the SinktTask.start() method. We could possibly handle that case specially: if offsets() is called during start(), WorkerSinkTask could use the RebalanceListener to defer the seek() until the partitions are actually assigned. > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Affects Versions: 2.5.0 > Reporter: Ning Zhang > Assignee: Ning Zhang > Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution (that has been initially verified) proposed in the attached > PR is to use *consumer.assign* with *consumer.seek* , instead of > *consumer.subscribe*, to handle the initial position of the consumer, when > specific offsets are provided by external through WorkerSinkTaskContext -- This message was sent by Atlassian Jira (v8.3.4#803005)