Hi Fanbin, > 2. I have parallelism = 32 and only one task has the record. Can you please elaborate more on why this would affect the watermark advancement? Each parallel subtask of a source function usually generates its watermarks independently, say wk1, wk2... wkn. The downstream window operator’s current event time is the minimum of its input streams’ event times, so here wk_window = min(wk1, wk2... wkn). If some of the tasks don't have data, the wk_window would not be advanced. More details here[1].
In your case, you can set the parallelism of the source to 1 to solve your problem, and also, keep the parallelism of assignTimestampsAndWatermarks same with source. > 4. data span time > window time. I don't quite understand why this matters. For example, if you have a tumbling window with a size of 1 day, but the data all comes are within 1 hour of this day. In this case, the event time would not reach the end of the window, i.e., the window will not fire. Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams On Thu, Jul 25, 2019 at 12:17 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > Hequn, > > Thanks for the help. It is indeed a watermark problem. From Flink UI, I > can see the low watermark value for each operator. And the groupBy operator > has lagged value of watermark. I checked the link from SO and confirmed > that: > 1. I do see record coming in for this operator > 2. I have parallelism = 32 and only one task has the record. Can you > please elaborate more on why this would affect the watermark advancement? > 3. Event create time is in ms > 4. data span time > window time. I don't quite understand why this > matters. > > Thanks, > Fanbin > > On Tue, Jul 23, 2019 at 7:17 PM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Fanbin, >> >> Fabian is right, it should be a watermark problem. Probably, some tasks >> of the source don't have enough data to advance the watermark. Furthermore, >> you could also monitor event time through Flink web interface. >> I have answered a similar question on stackoverflow, see more details >> here[1]. >> >> Best, Hequn >> >> [1] >> https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger >> >> On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <fanbin...@coinbase.com> wrote: >> >>> If I use proctime, the groupBy happens without any delay. >>> >>> On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <fanbin...@coinbase.com> >>> wrote: >>> >>>> not sure whether this is related: >>>> >>>> public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( >>>> AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { >>>> >>>> // match parallelism to input, otherwise dop=1 sources could lead to >>>> some strange >>>> // behaviour: the watermark will creep along very slowly because the >>>> elements >>>> // from the source go to each extraction operator round robin. >>>> final int inputParallelism = getTransformation().getParallelism(); >>>> final AssignerWithPeriodicWatermarks<T> cleanedAssigner = >>>> clean(timestampAndWatermarkAssigner); >>>> >>>> TimestampsAndPeriodicWatermarksOperator<T> operator = >>>> new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); >>>> >>>> return transform("Timestamps/Watermarks", >>>> getTransformation().getOutputType(), operator) >>>> .setParallelism(inputParallelism); >>>> } >>>> >>>> parallelism is set to 32 >>>> >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> >>>> env.setParallelism(32) >>>> >>>> and the command for launching the job is >>>> >>>> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS >>>> >>>> >>>> >>>> >>>> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com> >>>> wrote: >>>> >>>>> Thanks Fabian for the prompt reply. I just started using Flink and >>>>> this is a great community. >>>>> The watermark setting is only accounting for 10 sec delay. Besides >>>>> that, the local job on IntelliJ is running fine without issues. >>>>> >>>>> Here is the code: >>>>> >>>>> class EventTimestampExtractor(slack: Long = 0L) extends >>>>> AssignerWithPeriodicWatermarks[T] { >>>>> >>>>> var currentMaxTimestamp: Long = _ >>>>> >>>>> override def extractTimestamp(e: T, prevElementTimestamp: Long) = { >>>>> val elemTs = e.created_at >>>>> currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp) >>>>> elemTs >>>>> } >>>>> >>>>> override def getCurrentWatermark(): Watermark = { >>>>> new Watermark(currentMaxTimestamp) >>>>> } >>>>> } >>>>> >>>>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000)) >>>>> >>>>> Are there any other things I should be aware of? >>>>> >>>>> Thanks again for you kind help! >>>>> >>>>> Fanbin >>>>> >>>>> >>>>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Fanbin, >>>>>> >>>>>> The delay is most likely caused by the watermark delay. >>>>>> A window is computed when the watermark passes the end of the window. >>>>>> If you configured the watermark to be 10 minutes before the current max >>>>>> timestamp (probably to account for out of order data), then the window >>>>>> will >>>>>> be computed with approx. 10 minute delay. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu < >>>>>> fanbin...@coinbase.com>: >>>>>> >>>>>>> Hi, >>>>>>> I have a Flink sql streaming job defined by: >>>>>>> >>>>>>> SELECT >>>>>>> user_id >>>>>>> , hop_end(created_at, interval '30' second, interval '1' minute) as >>>>>>> bucket_ts >>>>>>> , count(name) as count >>>>>>> FROM event >>>>>>> WHERE name = 'signin' >>>>>>> GROUP BY >>>>>>> user_id >>>>>>> , hop(created_at, interval '30' second, interval '1' minute) >>>>>>> >>>>>>> >>>>>>> there is a noticeably delay of the groupBy operator. For example, I >>>>>>> only see the record sent out 10 min later after the record received in. >>>>>>> see >>>>>>> the attached pic. >>>>>>> >>>>>>> [image: image.png] >>>>>>> >>>>>>> I m expecting to see the group by result after one minute since the >>>>>>> sliding window size is 1 min and the slide is 30 sec. >>>>>>> >>>>>>> There is no such issue if I run the job locally in IntelliJ. >>>>>>> However, I ran into the above issue if I run the job on EMR (flink >>>>>>> version >>>>>>> = 1.7) >>>>>>> >>>>>>> Can anybody give a clue of what could be wrong? >>>>>>> Thanks, >>>>>>> >>>>>>> Fanbin >>>>>>> >>>>>>