Hi all,
I'm trying to aggregate a stream of messages and return a stream of
aggregated results using kafka streams.
At some point, depending on the incoming message, the old aggregate needs
to be closed and a new aggregate needs to be created, just like a session
that is closed due to some close event and at the same time a new session
is started.

For this I'm using transformValues where I store the result of an
aggregation similar to how a groupByKey().aggregate() is done. When the old
session needs to be closed, it's sent first after the new value.

The state store returns null for a given key at first retrieval and the new
aggregation result is stored under the same key.
However, at the second pass, the value for the same key is still null even
though it has just been stored before.

How can this be possible?



I'm using transformValues in the following way:

val storeName = "aggregateOverflow_binReportAgg"
val store = Stores.keyValueStoreBuilder<K,
V>(Stores.persistentKeyValueStore(storeName), serde.serde(), serde.serde())
streamsBuilder.addStateStore(store)

...

stream
   .flatTransformValues(ValueTransformerWithKeySupplier {
AggregateOverflow(storeName, transformation) }, storeName)


where AggregateOverflow gets the previous value from the state store,
transforms the result into a AggregateOverflowResult.
AggregateOverflowResult is a data class containing the current value and an
optional overflow value like this:

data class AggregateOverflowResult<V>(val current: V, val overflow: V?)

When the overflow value is not null, it's sent downstream first after the
current value. In each case, the current result is stored in the statestore
for later retrieval like the following:

class AggregateOverflow<K, V, VR : Any>(
 private val storeName: String,
 private val transformation: (K, V, VR?) -> AggregateOverflowResult<VR>?) :
ValueTransformerWithKey<K, V, Iterable<VR>> {
 private val logger = KotlinLogging.logger{}
 private lateinit var state: KeyValueStore<K, VR>

 init {
   logger.debug { "$storeName: created" }
 }

 override fun init(context: ProcessorContext) {
   logger.debug { "$storeName: init called" }
   this.state = context.getStateStore(storeName) as KeyValueStore<K, VR>;
 }

 override fun transform(key: K, value: V): Iterable<VR> {
   val acc = state.get(key)
   if (acc == null) logger.debug { "$storeName: Found empty value for $key"
}
   val result = transformation(key, value, acc)
   state.put(key, result?.current)
   logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
old: $acc\n aggregate new: $result" }
   return listOfNotNull(result?.overflow, result?.current) //prevAcc will
be forwarded first if not null
 }

 override fun close() {
   logger.debug { "$storeName: close called" }
 }
}

In the log file you can see that the first invocation is returning an empty
value for the given key, you can also see that the new value is being
serialized in the store.
At the second invocation a few seconds later, the value for the same key is
still null.

Any idea's why this is?
Best regards
Jan

Reply via email to