Ning Zhang created KAFKA-10370:
----------------------------------
Summary: WorkerSinkTask: IllegalStateException cased by
consumer.seek(tp, offsets) when (tp, offsets) are supplied by
WorkerSinkTaskContext
Key: KAFKA-10370
URL: https://issues.apache.org/jira/browse/KAFKA-10370
Project: Kafka
Issue Type: New Feature
Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: Ning Zhang
Assignee: Ning Zhang
Fix For: 2.6.0
In WorkerSinkTask.java, when we want the consumer to start consuming from
certain offsets, rather than from the last committed offset,
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
is used to carry the offsets from external world (e.g. implementation of
SinkTask).
In the [poll()
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext,
(2) consumer.seek(tp, offset) to rewind the consumer.
when running (2), we saw the following IllegalStateException:
```
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
```
As suggested in
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
the resolution is to use *consumer.assign* with *consumer.seek* , instead of
*consumer.subscribe*
--
This message was sent by Atlassian Jira
(v8.3.4#803005)