Hello Ishita, Is it consistently reproducing? And which Kafka version are you using?
Guozhang On Thu, May 2, 2019 at 5:24 PM Ishita Rakshit <mail2i...@gmail.com> wrote: > Hi, > I have a Kafka Streams application where I am joining a KStream that reads > from "topic1" with a GlobalKTable that reads from "topic2" and then with > another GlobalKTable that reads from "topic3". Here is the pseudo code - > > > > KStream<String, GenericRecord> topic1KStream = > > builder.stream( > > "topic1", > > Consumed.with(Serdes.String(), genericRecordSerde) > > ); > > GlobalKTable<String, GenericRecord> topic2KTable = > > builder.globalTable( > > "topic2", > > Consumed.with(Serdes.String(), genericRecordSerde), > > Materialized.<String, GenericRecord, KeyValueStore<Bytes, > > byte[]>>as("topic2-global-store") > > .withKeySerde(Serdes.String()) > > .withValueSerde(genericRecordSerde) > > ); > > GlobalKTable<String, GenericRecord> topic3KTable = > > builder.globalTable( > > "topic3", > > Consumed.with(Serdes.String(), genericRecordSerde), > > Materialized.<String, GenericRecord, KeyValueStore<Bytes, > > byte[]>>as("topic3-global-store") > > .withKeySerde(Serdes.String()) > > .withValueSerde(genericRecordSerde) > > ); > > > > KStream<String, MergedObj> stream_topic1_topic2 = topic1KStream.join( > > topic2KTable, > > (topic2Id, topic1Obj) -> topic1.get("id").toString(), > > (topic1Obj, topic2Obj) -> new MergedObj(topic1Obj, topic2Obj) > > ); > > final KStream<String, GenericRecord> enrichedStream = > > stream_topic1_topic2.join( > > topic3KTable, > > (topic2Id, mergedObj) -> mergedObj.topic3Id(), > > (mergedObj, topic3Obj) -> new Enriched( > > mergedObj.topic1Obj, > > mergedObj.topic2Obj, > > topic3Obj > > ).enrich() > > ); > > enrichedStream.to("enrichedStreamTopic", Produced.with(Serdes.String(), > > getGenericRecordSerde())); > > > The above code is very similar to this - > > https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java > > When I try to push messages to all 3 topics at the same time then I get > following exception - > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > > process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=topic1, > > partition=1, offset=61465, > > stacktrace=org.apache.kafka.streams.errors.InvalidStateStoreException: > > Store topic2-global-store is currently closed. > > at > > > org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:66) > > at > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150) > > at > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:37) > > at > > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:135) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadOnlyDecorator.get(ProcessorContextImpl.java:245) > > at > > > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49) > > at > > > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:71) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122) > > at > > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) > > at > > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364) > > at > > > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199) > > at > > > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > > > If I push message one by one in these topics, then I do not get this > exception. > > Kindly advice how to fix this exception. > > Thanks for your help in advance. > > Regards, > Ishita > -- -- Guozhang