[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179220#comment-17179220
 ] 

Ryanne Dolan commented on KAFKA-10370:
--------------------------------------

Thanks [~yangguo1220]. I believe that using assign() instead of subscribe() 
will have unexpected side effects, e.g. no rebalances and no auto-detection of 
new topics/partitions. As you know, in MirrorSourceTask, we explicitly avoided 
using subscribe() and instead handle rebalances and new topic-partitions 
explicitly, for efficiency purposes. (Rebalances were a major problem in 
production deployments of MM1.) But I'm not sure it would be appropriate to 
change the behavior of WorkerSinkTask here, especially as a side-effect of 
calling SinkTaskContext.offset().

IIRC, the root cause of the exception is actually that subscribe() results in 
partitions being assigned asynchronously, so if you subscribe() and then seek() 
you'll likely have zero assignments at that point. I believe the correct way to 
deal with this is to register a RebalanceListener, which can rewind the offsets 
of a partition _after_ the partition is assigned.

It may be possible for WorkerSinkTask to do this automatically. There are 
basically two scenarios:

- SinkTaskContext.offset() is called _after_ a partition is assigned, in which 
case the existing implementation should work. Unfortunately, it seems 
impossible for a SinkTask to know whether the partition is assigned or not. 
This to me seems like a bug in the API.
- SinkTaskContext.offset() is called _before_ a partition is assigned, which 
would result in the exception you're seeing. In this case, WorkerSinkTask could 
store the offsets and seek() asynchronously using a RebalanceListener. This 
essentially defers the seek() until _after_ the partition is actually assigned, 
thus avoiding the exception.

It's possible this bug only exists in the edge-case of calling 
WorkerSinkTask.offsets() within the SinktTask.start() method. We could possibly 
handle that case specially: if offsets() is called during start(), 
WorkerSinkTask could use the RebalanceListener to defer the seek() until the 
partitions are actually assigned.

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10370
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10370
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>    Affects Versions: 2.5.0
>            Reporter: Ning Zhang
>            Assignee: Ning Zhang
>            Priority: Major
>             Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
>         at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
>         at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> PR is to use *consumer.assign* with *consumer.seek* , instead of 
> *consumer.subscribe*, to handle the initial position of the consumer, when 
> specific offsets are provided by external through WorkerSinkTaskContext



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to