Try if this works... println(query.lastProgress.eventTime.get("watermark"))
Regards,Sanjay On 2018/09/30 09:05:40, peay <p...@protonmail.com.INVALID> wrote: > Thanks for the pointers. I guess right now the only workaround would be to > apply a "dummy" aggregation (e.g., group by the timestamp itself) only to > have the stateful processing logic kick in and apply the filtering?> > > For my purposes, an alternative solution to pushing it out to the source > would be to make the watermark timestamp available through a function so that > it can be used in a regular filter clause. Based on my experiments, the > timestamp is computed and updated even when no stateful computations occur. I > am not sure how easy that would be to contribute though, maybe someone can > suggest a starting point?> > > Thanks,> > > ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐> > On Sunday, 30 September 2018 10:41, Jungtaek Lim <ka...@gmail.com> wrote:> > > > The purpose of watermark is to set a limitation on handling records due to > > state going infinity. In other cases (non-stateful operations), it is > > pretty normal to handle all of records even they're pretty late.> > >> > > Btw, there was some comments regarding this: while Spark delegates to > > filter out late records in stateful operations for now, some of us > > (including me) think filtering out late records in earlier phase (source, > > or just after source) makes more sense. It just didn't come out as action, > > but I think it is still valid.> > >> > > https://github.com/apache/spark/pull/21617#issuecomment-400119049> > >> > > If we move the phase of filtering out late records, what you would like to > > do may become the default behavior. This also means the output may be also > > changed for queries which use non-stateful operations, so it is not a > > trivial change and may require consensus like SPIP process.> > >> > > Thanks,> > > Jungtaek Lim (HeartSaVioR)> > >> > > 2018년9월30일(일) 오후5:19, chandan prakash <ch...@gmail.com>님이작성:> > >> > >> Interesting question.> > >> I do not think without any aggregation operation/groupBy , watermark is > >> supported currently .> > >>> > >> Reason:> > >> Watermark in Structured Streaming is used for limiting the size of state > >> needed to keep intermediate information in-memory.> > >> And state only comes in picture in case of stateful processing.> > >> Also in the code, it seems that filtering out records on basis of > >> watermark happen only in case of stateful operators > >> (statefulOperators.scala)> > >> Have not tried running code though and would like to know if someone can > >> shed more light on this.> > >>> > >> Regards,> > >> Chandan> > >>> > >> On Sat, Sep 22, 2018 at 7:43 PM peay <pe...@protonmail.com.invalid> > >> wrote:> > >>> > >>> Hello,> > >>>> > >>> I am trying to use watermarking without aggregation, to filter out > >>> records that are just too late, instead of appending them to the output. > >>> My understanding is that aggregation is required for `withWatermark` to > >>> have any effect. Is that correct?> > >>>> > >>> I am looking for something along the lines of> > >>>> > >>> ```> > >>> df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark())> > >>> ```> > >>>> > >>> Is there any way to get the watermark value to achieve that?> > >>>> > >>> Thanks!> > >>> > >> --> > >> Chandan Prakash>