Thanks Fabian for the prompt reply. I just started using Flink and this is a great community. The watermark setting is only accounting for 10 sec delay. Besides that, the local job on IntelliJ is running fine without issues.
Here is the code: class EventTimestampExtractor(slack: Long = 0L) extends AssignerWithPeriodicWatermarks[T] { var currentMaxTimestamp: Long = _ override def extractTimestamp(e: T, prevElementTimestamp: Long) = { val elemTs = e.created_at currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp) elemTs } override def getCurrentWatermark(): Watermark = { new Watermark(currentMaxTimestamp) } } events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000)) Are there any other things I should be aware of? Thanks again for you kind help! Fanbin On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Fanbin, > > The delay is most likely caused by the watermark delay. > A window is computed when the watermark passes the end of the window. If > you configured the watermark to be 10 minutes before the current max > timestamp (probably to account for out of order data), then the window will > be computed with approx. 10 minute delay. > > Best, Fabian > > Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu < > fanbin...@coinbase.com>: > >> Hi, >> I have a Flink sql streaming job defined by: >> >> SELECT >> user_id >> , hop_end(created_at, interval '30' second, interval '1' minute) as >> bucket_ts >> , count(name) as count >> FROM event >> WHERE name = 'signin' >> GROUP BY >> user_id >> , hop(created_at, interval '30' second, interval '1' minute) >> >> >> there is a noticeably delay of the groupBy operator. For example, I only >> see the record sent out 10 min later after the record received in. see the >> attached pic. >> >> [image: image.png] >> >> I m expecting to see the group by result after one minute since the >> sliding window size is 1 min and the slide is 30 sec. >> >> There is no such issue if I run the job locally in IntelliJ. However, I >> ran into the above issue if I run the job on EMR (flink version = 1.7) >> >> Can anybody give a clue of what could be wrong? >> Thanks, >> >> Fanbin >> >