Hi Fanbin,

> 2. I have parallelism = 32 and only one task has the record. Can you
please elaborate more on why this would affect the watermark advancement?
Each parallel subtask of a source function usually generates its watermarks
independently, say wk1, wk2... wkn. The downstream window operator’s
current event time is the minimum of its input streams’ event times, so
here wk_window = min(wk1, wk2... wkn).
If some of the tasks don't have data, the wk_window would not be advanced.
More details here[1].

In your case, you can set the parallelism of the source to 1 to solve your
problem, and also, keep the parallelism of assignTimestampsAndWatermarks
same with source.

> 4. data span time > window time. I don't quite understand why this
matters.
For example, if you have a tumbling window with a size of 1 day, but the
data all comes are within 1 hour of this day. In this case, the event time
would not reach the end of the window, i.e., the window will not fire.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams

On Thu, Jul 25, 2019 at 12:17 AM Fanbin Bu <fanbin...@coinbase.com> wrote:

> Hequn,
>
> Thanks for the help. It is indeed a watermark problem. From Flink UI, I
> can see the low watermark value for each operator. And the groupBy operator
> has lagged value of watermark. I checked the link from SO and confirmed
> that:
> 1. I do see record coming in for this operator
> 2. I have parallelism = 32 and only one task has the record. Can you
> please elaborate more on why this would affect the watermark advancement?
> 3. Event create time is in ms
> 4. data span time > window time. I don't quite understand why this
> matters.
>
> Thanks,
> Fanbin
>
> On Tue, Jul 23, 2019 at 7:17 PM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi Fanbin,
>>
>> Fabian is right, it should be a watermark problem. Probably, some tasks
>> of the source don't have enough data to advance the watermark. Furthermore,
>> you could also monitor event time through Flink web interface.
>> I have answered a similar question on stackoverflow, see more details
>> here[1].
>>
>> Best, Hequn
>>
>> [1]
>> https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger
>>
>> On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>>
>>> If I use proctime, the groupBy happens without any delay.
>>>
>>> On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <fanbin...@coinbase.com>
>>> wrote:
>>>
>>>> not sure whether this is related:
>>>>
>>>> public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
>>>>       AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
>>>>
>>>>    // match parallelism to input, otherwise dop=1 sources could lead to 
>>>> some strange
>>>>    // behaviour: the watermark will creep along very slowly because the 
>>>> elements
>>>>    // from the source go to each extraction operator round robin.
>>>>    final int inputParallelism = getTransformation().getParallelism();
>>>>    final AssignerWithPeriodicWatermarks<T> cleanedAssigner = 
>>>> clean(timestampAndWatermarkAssigner);
>>>>
>>>>    TimestampsAndPeriodicWatermarksOperator<T> operator =
>>>>          new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
>>>>
>>>>    return transform("Timestamps/Watermarks", 
>>>> getTransformation().getOutputType(), operator)
>>>>          .setParallelism(inputParallelism);
>>>> }
>>>>
>>>> parallelism is set to 32
>>>>
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>
>>>> env.setParallelism(32)
>>>>
>>>> and the command for launching the job is
>>>>
>>>> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com>
>>>> wrote:
>>>>
>>>>> Thanks Fabian for the prompt reply. I just started using Flink and
>>>>> this is a great community.
>>>>> The watermark setting is only accounting for 10 sec delay. Besides
>>>>> that, the local job on IntelliJ is running fine without issues.
>>>>>
>>>>> Here is the code:
>>>>>
>>>>> class EventTimestampExtractor(slack: Long = 0L) extends 
>>>>> AssignerWithPeriodicWatermarks[T] {
>>>>>
>>>>>   var currentMaxTimestamp: Long = _
>>>>>
>>>>>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
>>>>>     val elemTs = e.created_at
>>>>>     currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
>>>>>     elemTs
>>>>>   }
>>>>>
>>>>>   override def getCurrentWatermark(): Watermark = {
>>>>>       new Watermark(currentMaxTimestamp)
>>>>>   }
>>>>> }
>>>>>
>>>>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
>>>>>
>>>>> Are there any other things I should be aware of?
>>>>>
>>>>> Thanks again for you kind help!
>>>>>
>>>>> Fanbin
>>>>>
>>>>>
>>>>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Fanbin,
>>>>>>
>>>>>> The delay is most likely caused by the watermark delay.
>>>>>> A window is computed when the watermark passes the end of the window.
>>>>>> If you configured the watermark to be 10 minutes before the current max
>>>>>> timestamp (probably to account for out of order data), then the window 
>>>>>> will
>>>>>> be computed with approx. 10 minute delay.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
>>>>>> fanbin...@coinbase.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>> I have a Flink sql streaming job defined by:
>>>>>>>
>>>>>>> SELECT
>>>>>>>   user_id
>>>>>>>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
>>>>>>> bucket_ts
>>>>>>>   , count(name) as count
>>>>>>> FROM event
>>>>>>> WHERE name = 'signin'
>>>>>>> GROUP BY
>>>>>>>   user_id
>>>>>>>   , hop(created_at, interval '30' second, interval '1' minute)
>>>>>>>
>>>>>>>
>>>>>>> there is a noticeably delay of the groupBy operator. For example, I
>>>>>>> only see the record sent out 10 min later after the record received in. 
>>>>>>> see
>>>>>>> the attached pic.
>>>>>>>
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> I m expecting to see the group by result after one minute since the
>>>>>>> sliding window size is 1 min and the slide is 30 sec.
>>>>>>>
>>>>>>> There is no such issue if I run the job locally in IntelliJ.
>>>>>>> However, I ran into the above issue if I run the job on EMR (flink 
>>>>>>> version
>>>>>>> = 1.7)
>>>>>>>
>>>>>>> Can anybody give a clue of what could be wrong?
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Fanbin
>>>>>>>
>>>>>>

Reply via email to