[
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174955#comment-17174955
]
Ning Zhang commented on KAFKA-10370:
------------------------------------
Hi [~rhauch], when you have a chance, I would like to get your initial feedback
/ advice on this issue and proposed solution. Thanks cc [~ryannedolan]
> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets)
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> ----------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
> Issue Type: New Feature
> Components: KafkaConnect
> Affects Versions: 2.5.0
> Reporter: Ning Zhang
> Assignee: Ning Zhang
> Priority: Major
> Fix For: 2.7.0
>
>
> In
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
> when we want the consumer to consume from certain offsets, rather than from
> the last committed offset,
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
> provided a way to supply the offsets from external (e.g. implementation of
> SinkTask) to rewind the consumer.
> In the [poll()
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
> it first call
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
> to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
> when the [SinkTask
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
> we can supply the specific offsets by +"context.offset(supplied_offsets);+"
> in start() method, so that when the consumer does the first poll, it should
> rewind to the specific offsets in rewind() method. However in practice, we
> saw the following IllegalStateException when running consumer.seek(tp,
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0}
> Rewind test-1 to offset 3
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer
> clientId=connector-consumer-MirrorSinkConnector-0,
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task
> threw an uncaught and unrecoverable exception
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task
> is being killed and will not recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
> the resolution (that has been initially verified) proposed in the attached
> PR is to use *consumer.assign* with *consumer.seek* , instead of
> *consumer.subscribe*, to handle the initial position of the consumer, when
> specific offsets are provided by external through WorkerSinkTaskContext
--
This message was sent by Atlassian Jira
(v8.3.4#803005)