Hi Fanbin, Fabian is right, it should be a watermark problem. Probably, some tasks of the source don't have enough data to advance the watermark. Furthermore, you could also monitor event time through Flink web interface. I have answered a similar question on stackoverflow, see more details here[1].
Best, Hequn [1] https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > If I use proctime, the groupBy happens without any delay. > > On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > >> not sure whether this is related: >> >> public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( >> AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { >> >> // match parallelism to input, otherwise dop=1 sources could lead to some >> strange >> // behaviour: the watermark will creep along very slowly because the >> elements >> // from the source go to each extraction operator round robin. >> final int inputParallelism = getTransformation().getParallelism(); >> final AssignerWithPeriodicWatermarks<T> cleanedAssigner = >> clean(timestampAndWatermarkAssigner); >> >> TimestampsAndPeriodicWatermarksOperator<T> operator = >> new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); >> >> return transform("Timestamps/Watermarks", >> getTransformation().getOutputType(), operator) >> .setParallelism(inputParallelism); >> } >> >> parallelism is set to 32 >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> env.setParallelism(32) >> >> and the command for launching the job is >> >> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS >> >> >> >> >> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com> wrote: >> >>> Thanks Fabian for the prompt reply. I just started using Flink and this >>> is a great community. >>> The watermark setting is only accounting for 10 sec delay. Besides that, >>> the local job on IntelliJ is running fine without issues. >>> >>> Here is the code: >>> >>> class EventTimestampExtractor(slack: Long = 0L) extends >>> AssignerWithPeriodicWatermarks[T] { >>> >>> var currentMaxTimestamp: Long = _ >>> >>> override def extractTimestamp(e: T, prevElementTimestamp: Long) = { >>> val elemTs = e.created_at >>> currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp) >>> elemTs >>> } >>> >>> override def getCurrentWatermark(): Watermark = { >>> new Watermark(currentMaxTimestamp) >>> } >>> } >>> >>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000)) >>> >>> Are there any other things I should be aware of? >>> >>> Thanks again for you kind help! >>> >>> Fanbin >>> >>> >>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Fanbin, >>>> >>>> The delay is most likely caused by the watermark delay. >>>> A window is computed when the watermark passes the end of the window. >>>> If you configured the watermark to be 10 minutes before the current max >>>> timestamp (probably to account for out of order data), then the window will >>>> be computed with approx. 10 minute delay. >>>> >>>> Best, Fabian >>>> >>>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu < >>>> fanbin...@coinbase.com>: >>>> >>>>> Hi, >>>>> I have a Flink sql streaming job defined by: >>>>> >>>>> SELECT >>>>> user_id >>>>> , hop_end(created_at, interval '30' second, interval '1' minute) as >>>>> bucket_ts >>>>> , count(name) as count >>>>> FROM event >>>>> WHERE name = 'signin' >>>>> GROUP BY >>>>> user_id >>>>> , hop(created_at, interval '30' second, interval '1' minute) >>>>> >>>>> >>>>> there is a noticeably delay of the groupBy operator. For example, I >>>>> only see the record sent out 10 min later after the record received in. >>>>> see >>>>> the attached pic. >>>>> >>>>> [image: image.png] >>>>> >>>>> I m expecting to see the group by result after one minute since the >>>>> sliding window size is 1 min and the slide is 30 sec. >>>>> >>>>> There is no such issue if I run the job locally in IntelliJ. However, >>>>> I ran into the above issue if I run the job on EMR (flink version = 1.7) >>>>> >>>>> Can anybody give a clue of what could be wrong? >>>>> Thanks, >>>>> >>>>> Fanbin >>>>> >>>>