Hi Pushkar, Thanks for the question. I think that what’s happening is that, even though both branches use the same grouping logic, Streams can’t detect that they are the same. It just sees two group-bys and therefore introduces two repartitions, with a separate downstream task for each.
You might want to print out the topology description and visualize it with https://zz85.github.io/kafka-streams-viz/ . That will show you whether the stores wind up in the same task or not. The visualization will also show you the names of the input topics for those two partitions, which you can use in conjunction with the metadata methods on your KafkaStreams instance to query for the location of the keys in both stores. I suspect that with some tweaks you can re-write the topology to just have one downstream task, if that’s what you prefer. By the way, I think you could propose to add an optimization to make the groupBy behave the way you expected. If that’s interesting to you, let us know and we can give you some pointers! I hope this helps, John On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote: > Hi All, > > I have a stream application that creates 2 branches. Each branch includes > a state store where the status field of the kafka message determines the > branch, and therefore the state store used: > > Status OPEN = State store name totals > > Status CLOSED = State store name records > > > > I’m seeing that the streams application is running on a pod; however I’m > getting the exception: > > org.apache.kafka.streams.errors.InvalidStateStoreException: The state > store, records, may have migrated to another instance. > > > > If I physically access the pod and check the Rocksdb folders I do not see > the state store folder. If I check the keys in the totals state store on > this pod, I can find the key in the records state store on another pod. I > had assumed that because the key of the events are the same, the same > partition would be used for the two branches and therefore the same keys in > these two state store would be created on the same Kubernetes pod. This is > not an issue for the Kafka stream, but that assumption was used in the way > the state stores are read. I assumed if I found the key in the 'totals' > state store, the same key would be found on the same pod in the 'records' > state store. > > > > The questions I have are: > > 1) Is it expected that the state stores can hold the partition data on > different pods, and is this unique to streams using branch? > > 2) Is there a way to know if the state store is on the pod to avoid > handling this as an exception? > > > > Here is the topology of the stream in question: > > KStream<String, ConsolidatedIntervalTotalsModel>[] branches = stream > > .peek(receivingEventLogger) > > .selectKey(keyMapper) > > .mapValues(totalsValueMapper) > > .filter(nullKeyValueEventFilter) > > .branch((k, v) -> (RecordStatus.CLOSED.name > ().equalsIgnoreCase(v.getCurrent().getRecordStatus()) > > || RecordStatus.LB_RDELETE.name > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())), > > (k, v) -> true); > > > > // CLOSED and LB_RDELETE branch writes to records state store > > branches[0] > > .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde)) > > .aggregate(totalsInitializer, totalsAggregator, > materializedRecords) > > .toStream() > > .map(totalsInternalKeyValueMapper) > > .filter(nullKeyStringValueEventFilter) > > .to(loopbackTopic.name()); > > > > // DEFAULT branch writes to totals state store > > branches[1] > > .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde)) > > .aggregate(totalsInitializer, totalsAggregator, > materializedTotals) > > .toStream() > > .flatMap(totalsKeyValueMapper) > > .filter(nullKeyStringValueEventFilter) > > .peek(sendingEventLogger) > > .to(toTopic.name());