Try if this works...
println(query.lastProgress.eventTime.get("watermark"))

Regards,Sanjay
On 2018/09/30 09:05:40, peay <p...@protonmail.com.INVALID> wrote: 
> Thanks for the pointers. I guess right now the only workaround would be to 
> apply a "dummy" aggregation (e.g., group by the timestamp itself) only to 
> have the stateful processing logic kick in and apply the filtering?>

>

> For my purposes, an alternative solution to pushing it out to the source 
> would be to make the watermark timestamp available through a function so that 
> it can be used in a regular filter clause. Based on my experiments, the 
> timestamp is computed and updated even when no stateful computations occur. I 
> am not sure how easy that would be to contribute though, maybe someone can 
> suggest a starting point?> 

>

> Thanks,> 

>

> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐> 

> On Sunday, 30 September 2018 10:41, Jungtaek Lim <ka...@gmail.com> wrote:> 

>

> > The purpose of watermark is to set a limitation on handling records due to 
> > state going infinity. In other cases (non-stateful operations), it is 
> > pretty normal to handle all of records even they're pretty late.> 

> >> 

> > Btw, there was some comments regarding this: while Spark delegates to 
> > filter out late records in stateful operations for now, some of us 
> > (including me) think filtering out late records in earlier phase (source, 
> > or just after source) makes more sense. It just didn't come out as action, 
> > but I think it is still valid.> 

> >> 

> > https://github.com/apache/spark/pull/21617#issuecomment-400119049>

> >> 

> > If we move the phase of filtering out late records, what you would like to 
> > do may become the default behavior. This also means the output may be also 
> > changed for queries which use non-stateful operations, so it is not a 
> > trivial change and may require consensus like SPIP process.> 

> >> 

> > Thanks,> 

> > Jungtaek Lim (HeartSaVioR)> 

> >> 

> > 2018년9월30일(일) 오후5:19, chandan prakash <ch...@gmail.com>님이작성:> 

> >> 

> >> Interesting question.> 

> >> I do not think without any aggregation operation/groupBy , watermark is 
> >> supported currently .> 

> >>> 

> >> Reason:> 

> >> Watermark in Structured Streaming is used for limiting the size of state 
> >> needed to keep intermediate information in-memory.> 

> >> And state only comes in picture in case of stateful processing.> 

> >> Also in the code, it seems that  filtering out records on basis of 
> >> watermark happen only in case of stateful operators 
> >> (statefulOperators.scala)> 

> >> Have not tried running code though and would like to know if someone can 
> >> shed more light on this.> 

> >>> 

> >> Regards,> 

> >> Chandan> 

> >>> 

> >> On Sat, Sep 22, 2018 at 7:43 PM peay <pe...@protonmail.com.invalid> 
> >> wrote:> 

> >>> 

> >>> Hello,> 

> >>>> 

> >>> I am trying to use watermarking without aggregation, to filter out 
> >>> records that are just too late, instead of appending them to the output. 
> >>> My understanding is that aggregation is required for `withWatermark` to 
> >>> have any effect. Is that correct?> 

> >>>> 

> >>> I am looking for something along the lines of> 

> >>>> 

> >>> ```> 

> >>> df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark())> 

> >>> ```> 

> >>>> 

> >>> Is there any way to get the watermark value to achieve that?> 

> >>>> 

> >>> Thanks!> 

> >>> 

> >> --> 

> >> Chandan Prakash> 

Reply via email to