not sure whether this is related: public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
// match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements // from the source go to each extraction operator round robin. final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); } parallelism is set to 32 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(32) and the command for launching the job is flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > Thanks Fabian for the prompt reply. I just started using Flink and this is > a great community. > The watermark setting is only accounting for 10 sec delay. Besides that, > the local job on IntelliJ is running fine without issues. > > Here is the code: > > class EventTimestampExtractor(slack: Long = 0L) extends > AssignerWithPeriodicWatermarks[T] { > > var currentMaxTimestamp: Long = _ > > override def extractTimestamp(e: T, prevElementTimestamp: Long) = { > val elemTs = e.created_at > currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp) > elemTs > } > > override def getCurrentWatermark(): Watermark = { > new Watermark(currentMaxTimestamp) > } > } > > events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000)) > > Are there any other things I should be aware of? > > Thanks again for you kind help! > > Fanbin > > > On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Fanbin, >> >> The delay is most likely caused by the watermark delay. >> A window is computed when the watermark passes the end of the window. If >> you configured the watermark to be 10 minutes before the current max >> timestamp (probably to account for out of order data), then the window will >> be computed with approx. 10 minute delay. >> >> Best, Fabian >> >> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu < >> fanbin...@coinbase.com>: >> >>> Hi, >>> I have a Flink sql streaming job defined by: >>> >>> SELECT >>> user_id >>> , hop_end(created_at, interval '30' second, interval '1' minute) as >>> bucket_ts >>> , count(name) as count >>> FROM event >>> WHERE name = 'signin' >>> GROUP BY >>> user_id >>> , hop(created_at, interval '30' second, interval '1' minute) >>> >>> >>> there is a noticeably delay of the groupBy operator. For example, I only >>> see the record sent out 10 min later after the record received in. see the >>> attached pic. >>> >>> [image: image.png] >>> >>> I m expecting to see the group by result after one minute since the >>> sliding window size is 1 min and the slide is 30 sec. >>> >>> There is no such issue if I run the job locally in IntelliJ. However, I >>> ran into the above issue if I run the job on EMR (flink version = 1.7) >>> >>> Can anybody give a clue of what could be wrong? >>> Thanks, >>> >>> Fanbin >>> >>