Hi All, I have a stream application that creates 2 branches. Each branch includes a state store where the status field of the kafka message determines the branch, and therefore the state store used:
Status OPEN = State store name totals Status CLOSED = State store name records I’m seeing that the streams application is running on a pod; however I’m getting the exception: org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, records, may have migrated to another instance. If I physically access the pod and check the Rocksdb folders I do not see the state store folder. If I check the keys in the totals state store on this pod, I can find the key in the records state store on another pod. I had assumed that because the key of the events are the same, the same partition would be used for the two branches and therefore the same keys in these two state store would be created on the same Kubernetes pod. This is not an issue for the Kafka stream, but that assumption was used in the way the state stores are read. I assumed if I found the key in the 'totals' state store, the same key would be found on the same pod in the 'records' state store. The questions I have are: 1) Is it expected that the state stores can hold the partition data on different pods, and is this unique to streams using branch? 2) Is there a way to know if the state store is on the pod to avoid handling this as an exception? Here is the topology of the stream in question: KStream<String, ConsolidatedIntervalTotalsModel>[] branches = stream .peek(receivingEventLogger) .selectKey(keyMapper) .mapValues(totalsValueMapper) .filter(nullKeyValueEventFilter) .branch((k, v) -> (RecordStatus.CLOSED.name ().equalsIgnoreCase(v.getCurrent().getRecordStatus()) || RecordStatus.LB_RDELETE.name ().equalsIgnoreCase(v.getCurrent().getRecordStatus())), (k, v) -> true); // CLOSED and LB_RDELETE branch writes to records state store branches[0] .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde)) .aggregate(totalsInitializer, totalsAggregator, materializedRecords) .toStream() .map(totalsInternalKeyValueMapper) .filter(nullKeyStringValueEventFilter) .to(loopbackTopic.name()); // DEFAULT branch writes to totals state store branches[1] .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde)) .aggregate(totalsInitializer, totalsAggregator, materializedTotals) .toStream() .flatMap(totalsKeyValueMapper) .filter(nullKeyStringValueEventFilter) .peek(sendingEventLogger) .to(toTopic.name());