Mostafa Asgari created KAFKA-6401:
-------------------------------------
Summary: InvalidStateStoreException immediately after starting
streams
Key: KAFKA-6401
URL: https://issues.apache.org/jira/browse/KAFKA-6401
Project: Kafka
Issue Type: Bug
Components: streams
Environment: ubuntu 14.04
Reporter: Mostafa Asgari
Priority: Minor
Attachments: Test.java
Hi
I wrote a simple kafka streams application. After I start the stream, if I call
KafkaStreams.store immediately, I will get InvalidStateStoreException:
{code:java}
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store,
my-table, may have migrated to another instance.
{code}
Here is the complete code :
{code:java}
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Integer> table = builder.table(TOPIC_NAME ,
Consumed.with(Serdes.String(), Serdes.Integer(),
new FailOnInvalidTimestamp(),
Topology.AutoOffsetReset.EARLIEST), Materialized.as("my-table"));
Topology topology = builder.build();
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"my-streams-app");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"10000");
final KafkaStreams streams = new KafkaStreams( topology , props );
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
streams.close();
}
});
streams.start();
ReadOnlyKeyValueStore<String, Integer> store
streams.store(table.queryableStoreName(), QueryableStoreTypes.keyValueStore());
{code}
However if after start() method, I write Thread.sleep( SOME_AMOUNT ) I will not
get the exception any more.
I wonder if it is a bug or I did something wrong.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)