Bruno Cadonna created KAFKA-8602:
------------------------------------
Summary: StreamThread Dies Because Restore Consumer is not
Subscribed to Any Topic
Key: KAFKA-8602
URL: https://issues.apache.org/jira/browse/KAFKA-8602
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.1.1
Reporter: Bruno Cadonna
StreamThread dies with the following exception:
{code:java}
java.lang.IllegalStateException: Consumer is not subscribed to any topics or
assigned any partitions
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}
The reason is that the restore consumer is not subscribed to any topic. This
happens when a StreamThread gets assigned standby tasks for sub-topologies with
just state stores with disabled logging.
To reproduce the bug start two applications with one StreamThread and one
standby replica each and the following topology. The input topic should have
two partitions:
{code:java}
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "myTransformState";
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.Integer(),
Serdes.Integer())
.withLoggingDisabled();
builder.addStateStore(keyValueStoreBuilder);
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer,
Integer>>() {
private KeyValueStore<Integer, Integer> state;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>)
context.getStateStore(stateStoreName);
}
@Override
public KeyValue<Integer, Integer> transform(final Integer key,
final Integer value) {
final KeyValue<Integer, Integer> result = new KeyValue<>(key,
value);
return result;
}
@Override
public void close() {}
}, stateStoreName)
.to(OUTPUT_TOPIC);
{code}
Both StreamThreads should die with the above exception.
The root cause is that standby tasks are created although all state stores of
the sub-topology have a logging disabled.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)