> > let’s say we would have implemented distinct count by saving a map with > the key being the distinct value and the value being the last time we saw > this value. This would mean that we wouldn’t really need to save all the > steps in the middle and copy the data, we could only save the last portion. >
I don't think you can calculate count distinct in each event time window correctly using this map if there is late data, which is one of the key problems we are trying to solve with this API. If you are only tracking the last time you saw this value, how do you know if a late data item was already accounted for in any given window that is earlier than this "last time"? We would currently need to track the items seen in each window (though much less space is required for approx count distinct). However, the state eviction I mentioned above should also let you give us a boundary on how late data can be, and thus how many windows we need retain state for. You should also be able to group by processing time instead of event time if you want something closer to the semantics of DStreams. Finally, you can already construct the map you describe using structured streaming and use its result to output statistics at each trigger window: df.groupBy($"value") .select(max($"eventTime") as 'lastSeen) .writeStream .outputMode("complete") .trigger(ProcessingTime("5 minutes")) .foreach( <emit results using values from the map> )