If I use proctime, the groupBy happens without any delay. On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
> not sure whether this is related: > > public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( > AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { > > // match parallelism to input, otherwise dop=1 sources could lead to some > strange > // behaviour: the watermark will creep along very slowly because the > elements > // from the source go to each extraction operator round robin. > final int inputParallelism = getTransformation().getParallelism(); > final AssignerWithPeriodicWatermarks<T> cleanedAssigner = > clean(timestampAndWatermarkAssigner); > > TimestampsAndPeriodicWatermarksOperator<T> operator = > new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); > > return transform("Timestamps/Watermarks", > getTransformation().getOutputType(), operator) > .setParallelism(inputParallelism); > } > > parallelism is set to 32 > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setParallelism(32) > > and the command for launching the job is > > flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS > > > > > On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > >> Thanks Fabian for the prompt reply. I just started using Flink and this >> is a great community. >> The watermark setting is only accounting for 10 sec delay. Besides that, >> the local job on IntelliJ is running fine without issues. >> >> Here is the code: >> >> class EventTimestampExtractor(slack: Long = 0L) extends >> AssignerWithPeriodicWatermarks[T] { >> >> var currentMaxTimestamp: Long = _ >> >> override def extractTimestamp(e: T, prevElementTimestamp: Long) = { >> val elemTs = e.created_at >> currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp) >> elemTs >> } >> >> override def getCurrentWatermark(): Watermark = { >> new Watermark(currentMaxTimestamp) >> } >> } >> >> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000)) >> >> Are there any other things I should be aware of? >> >> Thanks again for you kind help! >> >> Fanbin >> >> >> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Fanbin, >>> >>> The delay is most likely caused by the watermark delay. >>> A window is computed when the watermark passes the end of the window. If >>> you configured the watermark to be 10 minutes before the current max >>> timestamp (probably to account for out of order data), then the window will >>> be computed with approx. 10 minute delay. >>> >>> Best, Fabian >>> >>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu < >>> fanbin...@coinbase.com>: >>> >>>> Hi, >>>> I have a Flink sql streaming job defined by: >>>> >>>> SELECT >>>> user_id >>>> , hop_end(created_at, interval '30' second, interval '1' minute) as >>>> bucket_ts >>>> , count(name) as count >>>> FROM event >>>> WHERE name = 'signin' >>>> GROUP BY >>>> user_id >>>> , hop(created_at, interval '30' second, interval '1' minute) >>>> >>>> >>>> there is a noticeably delay of the groupBy operator. For example, I >>>> only see the record sent out 10 min later after the record received in. see >>>> the attached pic. >>>> >>>> [image: image.png] >>>> >>>> I m expecting to see the group by result after one minute since the >>>> sliding window size is 1 min and the slide is 30 sec. >>>> >>>> There is no such issue if I run the job locally in IntelliJ. However, I >>>> ran into the above issue if I run the job on EMR (flink version = 1.7) >>>> >>>> Can anybody give a clue of what could be wrong? >>>> Thanks, >>>> >>>> Fanbin >>>> >>>