I meant to not use Flink's built-in windows at all but implement your logic
in a KeyedProcessFunction.

So basically:
myDataStream.keyBy(...).process(new MyKeyedProcessFunction)
instead of:
myDataStream.keyBy(...).window(...).process(new MyWindowProcessFunction)

Am Mo., 2. Sept. 2019 um 15:29 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> Im not sure what you mean by use process function and not window process
> function ,  as the window operator takes in a windowprocess function..
>
>
>
> *From:* Fabian Hueske <fhue...@gmail.com>
> *Sent:* Monday, August 26, 2019 1:33 PM
> *To:* Hanan Yehudai <hanan.yehu...@radcom.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> I would use a regular ProcessFunction, not a WindowProcessFunction.
>
>
>
> The final WM depends on how the records were partitioned at the watermark
> assigner (and the assigner itself).
>
> AFAIK, the distribution of files to source reader tasks is not
> deterministic. Hence, the final WM changes from run to run.
>
>
>
> Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> You said “ You can use a custom ProcessFunction and compare the timestamp
> of each record with the current watermark.”.
>
>
>
> Does the  window  process function has all the events – even the ones that
> are dropped due to lateness?
> from what I’m understand the “ iterable”  argument I contains the record
> that were inserted into the window  and NOT the ones dropped.   Isn’t that
> correct ?
>
>
>
>
>
> Also,
>
> when looking on Flink’s monitoring page  - for  the  watermarks  I see
> different vales  even after all my files were processed.  Which is
> something I would not expect
> I would expect that eventually   the WM will be the highest EVENT_TIME on
> my set of files..
>
>
>
>
>
> thanks
>
>
>
> *From:* Fabian Hueske <fhue...@gmail.com>
> *Sent:* Monday, August 26, 2019 12:38 PM
> *To:* Hanan Yehudai <hanan.yehu...@radcom.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> The paths of the files to read are distributed across all reader / source
> tasks and each task reads the files in order of their modification
> timestamp.
>
> The watermark generator is not aware of any files and just looks at the
> stream of records produced by the source tasks.
>
> You need to chose the WM generator strategy such that you minimize the
> number of late records.
>
>
>
> I'd recommend to first investigate how many late records you are dealing
> with.
>
> You can use a custom ProcessFunction and compare the timestamp of each
> record with the current watermark.
>
>
>
> AllowedLateness is also not a magical cure. It will just emit updates
> downstream, i.e., you need to remove the results that were updated by a
> more complete result.
>
>
>
> Best, Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> The data  source is generated by an application that monitors some sort of
> sessions.
>
> With the EVENT_TIME column being the session end time .
>
>
>
> It is possible that the files will have out of order data , because of the
> async nature of the application writing  files.
>
>  While the EVENT_TIME is monotonically  increasing in general .  some
> lateness is possible. However ,    I used *allowlateness*  on my stream
> and still got the inconsistencies
>
>
>
> Although the real life use case is generically reading files form a
> folder.  The testing  env has an already set of files in advanced -  these
>  should be read and produce the result.
>
>
>
> You mentioned the “right” order of the files.  Is it sorted by update time
> ?  when running in parallel, is it possible that 2 files will be read in
> parallel. And in case that the latter one is smaller.  The latest timestamp
> will  be handled first ?
>
>
>
>
>
> BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window
> is calculated ?  and got the processing to trigger multiple times  so I’m
> not sure exactly how this type of trigger works..
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> *From:* Fabian Hueske <fhue...@gmail.com>
> *Sent:* Monday, August 26, 2019 11:06 AM
> *To:* Hanan Yehudai <hanan.yehu...@radcom.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> Can you share a few more details about the data source?
>
> Are you continuously ingesting files from a folder?
>
>
>
> You are correct, that the parallelism should not affect the results, but
> there are a few things that can affect that:
>
> 1) non-determnistic keys
>
> 2) out-of-order data with inappropriate watermarks
>
>
>
> Note that watermark configuration for file ingests can be difficult and
> that you need to ensure that files are read in the "right" order.
>
> AFAIK, Flink's continuous file source uses the modification timestamp of
> files to determine the read order.
>
>
>
> Best, Fabian
>
>
>
> Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> I have an issue with tumbling windows running  in parallel.
>
>
>
> I run a Job on  a set of CSV files.
>
>
>
> When the parallelism is set to 1.  I get the proper results.
>
> While it runs in parallel.   I get no output.
>
> Is it  due to the fact the parallel streams take the MAX(watermark) from
> all the parallel sources.
>
> And only one of the streams advances the watermark ?
>
>
>
> It seems wrong that the result is not deterministic  and depends on the
> parallel level.
>
> What am I doing wrong ?
>
>
>
>

Reply via email to