[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ning Zhang updated KAFKA-10370: ------------------------------- Description: In [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind the consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. As a part of [WorkerSinkTask initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], when the [SinkTask starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], we can supply the specific offsets by +"context.offset(supplied_offsets);+" in start() method, so that when the consumer does the first poll, it should rewind to the specific offsets in rewind() method. However in practice, we saw the following IllegalStateException when running consumer.seek(tp, offsets); {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, in this case. was: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes (+start(Map<String, String> props)+), we do +"context.offset(offsets);+" , then in above step (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, in this case. > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Affects Versions: 2.5.0 > Reporter: Ning Zhang > Assignee: Ning Zhang > Priority: Major > Fix For: 2.6.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution that has been initially verified is to use *consumer.assign* > with *consumer.seek* , instead of *consumer.subscribe*, in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)