Hi, I have a Kafka Streams application where I am joining a KStream that reads from "topic1" with a GlobalKTable that reads from "topic2" and then with another GlobalKTable that reads from "topic3". Here is the pseudo code -
> KStream<String, GenericRecord> topic1KStream = > builder.stream( > "topic1", > Consumed.with(Serdes.String(), genericRecordSerde) > ); > GlobalKTable<String, GenericRecord> topic2KTable = > builder.globalTable( > "topic2", > Consumed.with(Serdes.String(), genericRecordSerde), > Materialized.<String, GenericRecord, KeyValueStore<Bytes, > byte[]>>as("topic2-global-store") > .withKeySerde(Serdes.String()) > .withValueSerde(genericRecordSerde) > ); > GlobalKTable<String, GenericRecord> topic3KTable = > builder.globalTable( > "topic3", > Consumed.with(Serdes.String(), genericRecordSerde), > Materialized.<String, GenericRecord, KeyValueStore<Bytes, > byte[]>>as("topic3-global-store") > .withKeySerde(Serdes.String()) > .withValueSerde(genericRecordSerde) > ); > > KStream<String, MergedObj> stream_topic1_topic2 = topic1KStream.join( > topic2KTable, > (topic2Id, topic1Obj) -> topic1.get("id").toString(), > (topic1Obj, topic2Obj) -> new MergedObj(topic1Obj, topic2Obj) > ); > final KStream<String, GenericRecord> enrichedStream = > stream_topic1_topic2.join( > topic3KTable, > (topic2Id, mergedObj) -> mergedObj.topic3Id(), > (mergedObj, topic3Obj) -> new Enriched( > mergedObj.topic1Obj, > mergedObj.topic2Obj, > topic3Obj > ).enrich() > ); > enrichedStream.to("enrichedStreamTopic", Produced.with(Serdes.String(), > getGenericRecordSerde())); The above code is very similar to this - https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java When I try to push messages to all 3 topics at the same time then I get following exception - org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=topic1, > partition=1, offset=61465, > stacktrace=org.apache.kafka.streams.errors.InvalidStateStoreException: > Store topic2-global-store is currently closed. > at > org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:66) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:37) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:135) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadOnlyDecorator.get(ProcessorContextImpl.java:245) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) If I push message one by one in these topics, then I do not get this exception. Kindly advice how to fix this exception. Thanks for your help in advance. Regards, Ishita