I meant to not use Flink's built-in windows at all but implement your logic in a KeyedProcessFunction.
So basically: myDataStream.keyBy(...).process(new MyKeyedProcessFunction) instead of: myDataStream.keyBy(...).window(...).process(new MyWindowProcessFunction) Am Mo., 2. Sept. 2019 um 15:29 Uhr schrieb Hanan Yehudai < hanan.yehu...@radcom.com>: > Im not sure what you mean by use process function and not window process > function , as the window operator takes in a windowprocess function.. > > > > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Monday, August 26, 2019 1:33 PM > *To:* Hanan Yehudai <hanan.yehu...@radcom.com> > *Cc:* user@flink.apache.org > *Subject:* Re: tumbling event time window , parallel > > > > I would use a regular ProcessFunction, not a WindowProcessFunction. > > > > The final WM depends on how the records were partitioned at the watermark > assigner (and the assigner itself). > > AFAIK, the distribution of files to source reader tasks is not > deterministic. Hence, the final WM changes from run to run. > > > > Fabian > > > > Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai < > hanan.yehu...@radcom.com>: > > You said “ You can use a custom ProcessFunction and compare the timestamp > of each record with the current watermark.”. > > > > Does the window process function has all the events – even the ones that > are dropped due to lateness? > from what I’m understand the “ iterable” argument I contains the record > that were inserted into the window and NOT the ones dropped. Isn’t that > correct ? > > > > > > Also, > > when looking on Flink’s monitoring page - for the watermarks I see > different vales even after all my files were processed. Which is > something I would not expect > I would expect that eventually the WM will be the highest EVENT_TIME on > my set of files.. > > > > > > thanks > > > > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Monday, August 26, 2019 12:38 PM > *To:* Hanan Yehudai <hanan.yehu...@radcom.com> > *Cc:* user@flink.apache.org > *Subject:* Re: tumbling event time window , parallel > > > > Hi, > > > > The paths of the files to read are distributed across all reader / source > tasks and each task reads the files in order of their modification > timestamp. > > The watermark generator is not aware of any files and just looks at the > stream of records produced by the source tasks. > > You need to chose the WM generator strategy such that you minimize the > number of late records. > > > > I'd recommend to first investigate how many late records you are dealing > with. > > You can use a custom ProcessFunction and compare the timestamp of each > record with the current watermark. > > > > AllowedLateness is also not a magical cure. It will just emit updates > downstream, i.e., you need to remove the results that were updated by a > more complete result. > > > > Best, Fabian > > > > Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai < > hanan.yehu...@radcom.com>: > > The data source is generated by an application that monitors some sort of > sessions. > > With the EVENT_TIME column being the session end time . > > > > It is possible that the files will have out of order data , because of the > async nature of the application writing files. > > While the EVENT_TIME is monotonically increasing in general . some > lateness is possible. However , I used *allowlateness* on my stream > and still got the inconsistencies > > > > Although the real life use case is generically reading files form a > folder. The testing env has an already set of files in advanced - these > should be read and produce the result. > > > > You mentioned the “right” order of the files. Is it sorted by update time > ? when running in parallel, is it possible that 2 files will be read in > parallel. And in case that the latter one is smaller. The latest timestamp > will be handled first ? > > > > > > BTW I tried to use a ContinuousEventTimeTrigger to make sure the window > is calculated ? and got the processing to trigger multiple times so I’m > not sure exactly how this type of trigger works.. > > > > Thanks > > > > > > > > > > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Monday, August 26, 2019 11:06 AM > *To:* Hanan Yehudai <hanan.yehu...@radcom.com> > *Cc:* user@flink.apache.org > *Subject:* Re: tumbling event time window , parallel > > > > Hi, > > > > Can you share a few more details about the data source? > > Are you continuously ingesting files from a folder? > > > > You are correct, that the parallelism should not affect the results, but > there are a few things that can affect that: > > 1) non-determnistic keys > > 2) out-of-order data with inappropriate watermarks > > > > Note that watermark configuration for file ingests can be difficult and > that you need to ensure that files are read in the "right" order. > > AFAIK, Flink's continuous file source uses the modification timestamp of > files to determine the read order. > > > > Best, Fabian > > > > Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai < > hanan.yehu...@radcom.com>: > > I have an issue with tumbling windows running in parallel. > > > > I run a Job on a set of CSV files. > > > > When the parallelism is set to 1. I get the proper results. > > While it runs in parallel. I get no output. > > Is it due to the fact the parallel streams take the MAX(watermark) from > all the parallel sources. > > And only one of the streams advances the watermark ? > > > > It seems wrong that the result is not deterministic and depends on the > parallel level. > > What am I doing wrong ? > > > >