A few questions here
First environment:
Scala, Kafka 0.11.0.0

I have a KTable[Windowed[String], MyType]
This is then transformed to a stream via .toStream[String]((k, _) =>
k.key())

The KTable is a result of a reduce operation with a SessionWindow
InactvitiyGap defined as 10 minutes. Retention (until) defined as 11
minutes.
Timestamp extractor used is WallclockTimestampExtractor

With the understanding that "stream time" progresses loosely based on event
time which is defined as wall clock time (I understand the implications
that stream time progresses based on the min time of all partitions, so
even though it's defined as WallClockTime, it's still, to some degree
dependent on data coming in on all partitions to advance stream time)

   1. Given SessionTime can continue to expand the window that is
   considered part of the same session, i.e., it's based on data arriving for
   that key. What happens with retention time? I've seen online definitions
   that seem to define the expiry of records due to retention as as StreamTime
   - Retention time. Is this correct and does it always hold true even if the
   Session continues to expand due to recent activity for a key? The gist of
   the question here: Is retention time/expiry calculation impacted by or take
   into consideration session window expansions?
   2. In the scenario described above with the KTable.toStream I am getting
   Tombstone records; i.e., records with a Key and Null value. Are these to be
   expected? (My assumption is Yes). Are these a result of "expiry" based on
   retention period?
   3. Can I rely on these "Tombstone" records to indicate expiry from the
   session store?


This ultimately boils down to understanding Windows better but also towards
trying to establish a proxy for indicating when a window expires as Kafka
Streams doesn't seem to support this yet. With that said, any plans on
supporting an indicator that tells downstream nodes that a message in a
Window has expired, even if this is done in batch as it seems expiry is
actually on the rocks-db segment level assuming default state stores.

Thanks!

-- 
Ahmad Alkilani

Reply via email to