I am trying to write a Spark Structured Streaming application consisting
of GroupState construct followed by aggregation.

Events arriving from event sources are bucketized by deviceId and quantized
timestamp, composed together into group state key idTime.

Logical plan consists of stages (in the order of data flow):

FlatMapGroupsWithState
Aggregate


and translates to physical plan (in the same order)

FlatMapGroupsWithState
SerializeFromObject
Project (idTime, timestamp)
HashAggregate(keys=[idTime] ... partial aggregation)
Exchange hashpartitioning(idTime)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreRestore [idTime], state info)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreSave [idTime], state info)
HashAggregate(keys=[idTime], functions= ...)


This all works, but it appears that partial aggregate state does not ever
get released.

If I send 10 events for some value of idTime, the stream produces an output
batch with count = 10.

If some significant time later (after group state expires) I send 10 more
events for the some value of idTime, the stream produces another output
batch with count = 20. Other aggregates also reflect that both old and new
events were reflected in this subsequent aggregation output batch.

Thus, it appears state information is not cleared from the state store.

This is nice from the standpoint of handling latecomer events, but also
poses a problem: if partial aggregate information per every idTime value is
never cleared from the state store, the state store eventually is going to
run out of space.

Is there a way to control this retention and trigger the release of state
store data for old values idTime, no longer needed?

Thanks for advice.

Reply via email to