Hi all, I'm trying to aggregate a stream of messages and return a stream of aggregated results using kafka streams. At some point, depending on the incoming message, the old aggregate needs to be closed and a new aggregate needs to be created, just like a session that is closed due to some close event and at the same time a new session is started.
For this I'm using transformValues where I store the result of an aggregation similar to how a groupByKey().aggregate() is done. When the old session needs to be closed, it's sent first after the new value. The state store returns null for a given key at first retrieval and the new aggregation result is stored under the same key. However, at the second pass, the value for the same key is still null even though it has just been stored before. How can this be possible? I'm using transformValues in the following way: val storeName = "aggregateOverflow_binReportAgg" val store = Stores.keyValueStoreBuilder<K, V>(Stores.persistentKeyValueStore(storeName), serde.serde(), serde.serde()) streamsBuilder.addStateStore(store) ... stream .flatTransformValues(ValueTransformerWithKeySupplier { AggregateOverflow(storeName, transformation) }, storeName) where AggregateOverflow gets the previous value from the state store, transforms the result into a AggregateOverflowResult. AggregateOverflowResult is a data class containing the current value and an optional overflow value like this: data class AggregateOverflowResult<V>(val current: V, val overflow: V?) When the overflow value is not null, it's sent downstream first after the current value. In each case, the current result is stored in the statestore for later retrieval like the following: class AggregateOverflow<K, V, VR : Any>( private val storeName: String, private val transformation: (K, V, VR?) -> AggregateOverflowResult<VR>?) : ValueTransformerWithKey<K, V, Iterable<VR>> { private val logger = KotlinLogging.logger{} private lateinit var state: KeyValueStore<K, VR> init { logger.debug { "$storeName: created" } } override fun init(context: ProcessorContext) { logger.debug { "$storeName: init called" } this.state = context.getStateStore(storeName) as KeyValueStore<K, VR>; } override fun transform(key: K, value: V): Iterable<VR> { val acc = state.get(key) if (acc == null) logger.debug { "$storeName: Found empty value for $key" } val result = transformation(key, value, acc) state.put(key, result?.current) logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate old: $acc\n aggregate new: $result" } return listOfNotNull(result?.overflow, result?.current) //prevAcc will be forwarded first if not null } override fun close() { logger.debug { "$storeName: close called" } } } In the log file you can see that the first invocation is returning an empty value for the given key, you can also see that the new value is being serialized in the store. At the second invocation a few seconds later, the value for the same key is still null. Any idea's why this is? Best regards Jan