Mostafa Asgari created KAFKA-6401: ------------------------------------- Summary: InvalidStateStoreException immediately after starting streams Key: KAFKA-6401 URL: https://issues.apache.org/jira/browse/KAFKA-6401 Project: Kafka Issue Type: Bug Components: streams Environment: ubuntu 14.04 Reporter: Mostafa Asgari Priority: Minor Attachments: Test.java
Hi I wrote a simple kafka streams application. After I start the stream, if I call KafkaStreams.store immediately, I will get InvalidStateStoreException: {code:java} org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-table, may have migrated to another instance. {code} Here is the complete code : {code:java} final StreamsBuilder builder = new StreamsBuilder(); final KTable<String, Integer> table = builder.table(TOPIC_NAME , Consumed.with(Serdes.String(), Serdes.Integer(), new FailOnInvalidTimestamp(), Topology.AutoOffsetReset.EARLIEST), Materialized.as("my-table")); Topology topology = builder.build(); Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"my-streams-app"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"10000"); final KafkaStreams streams = new KafkaStreams( topology , props ); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { streams.close(); } }); streams.start(); ReadOnlyKeyValueStore<String, Integer> store streams.store(table.queryableStoreName(), QueryableStoreTypes.keyValueStore()); {code} However if after start() method, I write Thread.sleep( SOME_AMOUNT ) I will not get the exception any more. I wonder if it is a bug or I did something wrong. -- This message was sent by Atlassian JIRA (v6.4.14#64029)