Hi All,

I have a stream application that creates 2 branches.  Each branch includes
a state store where the status field of the kafka message determines the
branch, and therefore the state store used:

Status OPEN = State store name totals

Status CLOSED = State store name records



I’m seeing that the streams application is running on a pod; however I’m
getting the exception:

org.apache.kafka.streams.errors.InvalidStateStoreException: The state
store, records, may have migrated to another instance.



If I physically access the pod and check the Rocksdb folders I do not see
the state store folder.  If I check the keys in the totals state store on
this pod, I can find the key in the records state store on another pod. I
had assumed that because the key of the events are the same, the same
partition would be used for the two branches and therefore the same keys in
these two state store would be created on the same Kubernetes pod.  This is
not an issue for the Kafka stream, but that assumption was used in the way
the state stores are read.  I assumed if I found the key in the 'totals'
state store, the same key would be found on the same pod in the 'records'
state store.



The questions I have are:

1) Is it expected that the state stores can hold the partition data on
different pods, and is this unique to streams using branch?

2) Is there a way to know if the state store is on the pod to avoid
handling this as an exception?



Here is the topology of the stream in question:

        KStream<String, ConsolidatedIntervalTotalsModel>[] branches = stream

            .peek(receivingEventLogger)

            .selectKey(keyMapper)

            .mapValues(totalsValueMapper)

            .filter(nullKeyValueEventFilter)

            .branch((k, v) -> (RecordStatus.CLOSED.name
().equalsIgnoreCase(v.getCurrent().getRecordStatus())

                        || RecordStatus.LB_RDELETE.name
().equalsIgnoreCase(v.getCurrent().getRecordStatus())),

                    (k, v) -> true);



        // CLOSED and LB_RDELETE branch writes to records state store

        branches[0]

            .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))

            .aggregate(totalsInitializer, totalsAggregator,
materializedRecords)

            .toStream()

            .map(totalsInternalKeyValueMapper)

            .filter(nullKeyStringValueEventFilter)

            .to(loopbackTopic.name());



        // DEFAULT branch writes to totals state store

        branches[1]

            .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))

            .aggregate(totalsInitializer, totalsAggregator,
materializedTotals)

            .toStream()

            .flatMap(totalsKeyValueMapper)

            .filter(nullKeyStringValueEventFilter)

            .peek(sendingEventLogger)

            .to(toTopic.name());

Reply via email to