[ 
https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-8602:
---------------------------------
    Description: 
StreamThread dies with the following exception:
{code:java}
java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
assigned any partitions
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}
The reason is that the restore consumer is not subscribed to any topic. This 
happens when a StreamThread gets assigned standby tasks for sub-topologies with 
just state stores with disabled logging.

To reproduce the bug start two applications with one StreamThread and one 
standby replica each and the following topology. The input topic should have 
two partitions:
{code:java}
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "myTransformState";
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
        Serdes.Integer(),
        Serdes.Integer())
        .withLoggingDisabled();
builder.addStateStore(keyValueStoreBuilder);
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
    .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, 
Integer>>() {
        private KeyValueStore<Integer, Integer> state;

            @SuppressWarnings("unchecked")
            @Override
            public void init(final ProcessorContext context) {
                state = (KeyValueStore<Integer, Integer>) 
context.getStateStore(stateStoreName);
            }

            @Override
            public KeyValue<Integer, Integer> transform(final Integer key, 
final Integer value) {
                final KeyValue<Integer, Integer> result = new KeyValue<>(key, 
value);
                return result;
            }

            @Override
            public void close() {}
        }, stateStoreName)
        .to(OUTPUT_TOPIC);
{code}
Both StreamThreads should die with the above exception.

The root cause is that standby tasks are created although all state stores of 
the sub-topology have logging disabled.  

  was:
StreamThread dies with the following exception:
{code:java}
java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
assigned any partitions
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}
The reason is that the restore consumer is not subscribed to any topic. This 
happens when a StreamThread gets assigned standby tasks for sub-topologies with 
just state stores with disabled logging.

To reproduce the bug start two applications with one StreamThread and one 
standby replica each and the following topology. The input topic should have 
two partitions:
{code:java}
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "myTransformState";
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
        Serdes.Integer(),
        Serdes.Integer())
        .withLoggingDisabled();
builder.addStateStore(keyValueStoreBuilder);
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
    .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, 
Integer>>() {
        private KeyValueStore<Integer, Integer> state;

            @SuppressWarnings("unchecked")
            @Override
            public void init(final ProcessorContext context) {
                state = (KeyValueStore<Integer, Integer>) 
context.getStateStore(stateStoreName);
            }

            @Override
            public KeyValue<Integer, Integer> transform(final Integer key, 
final Integer value) {
                final KeyValue<Integer, Integer> result = new KeyValue<>(key, 
value);
                return result;
            }

            @Override
            public void close() {}
        }, stateStoreName)
        .to(OUTPUT_TOPIC);
{code}
Both StreamThreads should die with the above exception.

The root cause is that standby tasks are created although all state stores of 
the sub-topology have a logging disabled.  


> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-8602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8602
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.1
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Critical
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
> assigned any partitions
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This 
> happens when a StreamThread gets assigned standby tasks for sub-topologies 
> with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one 
> standby replica each and the following topology. The input topic should have 
> two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
>     
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
>         Serdes.Integer(),
>         Serdes.Integer())
>         .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
>     .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, 
> Integer>>() {
>         private KeyValueStore<Integer, Integer> state;
>             @SuppressWarnings("unchecked")
>             @Override
>             public void init(final ProcessorContext context) {
>                 state = (KeyValueStore<Integer, Integer>) 
> context.getStateStore(stateStoreName);
>             }
>             @Override
>             public KeyValue<Integer, Integer> transform(final Integer key, 
> final Integer value) {
>                 final KeyValue<Integer, Integer> result = new KeyValue<>(key, 
> value);
>                 return result;
>             }
>             @Override
>             public void close() {}
>         }, stateStoreName)
>         .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of 
> the sub-topology have logging disabled.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to