Thanks! I'm working on a way to deliver the data in order (or closer to in
order) and deliver watermarks more often. I'll let you know my results.

On Thu, Jun 28, 2018 at 5:36 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> In a nutshell the Over operator works as follows:
> - When a row arrives it is put into a MapState keyed on its timestamp and
> a timer is registered to process it when the watermark passes that
> timestamp.
> - All the heavy computation is done in the onTimer() method. For each
> unique timestamp, the Over operator iterates once over all records in the
> MapState to retract and purge expired rows from and accumulate the new rows
> to the aggregation result.
>
> In Gregory's use case, many rows are added before the watermark advances.
> Once that happens, the operator becomes very busy and iterates many times
> over the state to retract and accumulate rows. During that time, the input
> stream cannot be consumed, hence checkpoints stall.
>
> The solution that seems to work is to increase the watermark interval.
> However, we could also think about improving the implementation to reduce
> the number of state iterations. A time-sorted state primitive would make
> that much easier.
>
> Best, Fabian
>
> 2018-06-28 6:41 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>:
>
>> Hi Gregory,
>>
>> What's the cause of your problem. It would be great if you can share your
>> experience which I think will definitely help others.
>>
>>
>> On Thu, Jun 28, 2018 at 11:30 AM, Gregory Fee <g...@lyft.com> wrote:
>>
>>> Yep, it was definitely a watermarking issue. I have that sorted out now.
>>> Thanks!
>>>
>>> On Wed, Jun 27, 2018 at 6:49 PM, Hequn Cheng <chenghe...@gmail.com>
>>> wrote:
>>>
>>>> Hi Gregory,
>>>>
>>>> As you are using the rowtime over window. It is probably a watermark
>>>> problem. The window only output when watermarks make a progress. You
>>>> can use processing-time(instead of row-time) to verify the assumption.
>>>> Also, make sure there are data in each of you source partition, the
>>>> watermarks make no progress if one of the source partition has no data. An
>>>> operator’s current event time is the minimum of its input streams’ event
>>>> times[1].
>>>>
>>>> Best, Hequn
>>>>
>>>> [1]: https://ci.apache.org/projects/flink/flink-docs-master/
>>>> dev/event_time.html
>>>>
>>>> On Thu, Jun 28, 2018 at 1:58 AM, Gregory Fee <g...@lyft.com> wrote:
>>>>
>>>>> Thanks for your answers! Yes, it was based on watermarks.
>>>>>
>>>>> Fabian, the state does indeed grow quite a bit in my scenario. I've
>>>>> observed in the range of 5GB. That doesn't seem to be an issue in itself.
>>>>> However, in my scenario I'm loading a lot of data from a historic store
>>>>> that is only partitioned by day. As such a full day's worth of data is
>>>>> loaded into the system before the watermark advances. At that point the
>>>>> checkpoints stall indefinitely with a couple of the tasks in the 'over'
>>>>> operator never acknowledging. Any thoughts on what would cause that? Or 
>>>>> how
>>>>> to address it?
>>>>>
>>>>> On Wed, Jun 27, 2018 at 2:20 AM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The OVER window operator can only emit result when the watermark is
>>>>>> advanced, due to SQL semantics which define that all records with the 
>>>>>> same
>>>>>> timestamp need to be processed together.
>>>>>> Can you check if the watermarks make sufficient progress?
>>>>>>
>>>>>> Btw. did you observe state size or IO issues? The OVER window
>>>>>> operator also needs to store the whole window interval in state, i.e., 14
>>>>>> days in your case, in order to be able to retract the data from the
>>>>>> aggregates after 14 days.
>>>>>> Everytime the watermark moves, the operator iterates over all
>>>>>> timestamps (per key) to check which records need to be removed.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2018-06-27 5:38 GMT+02:00 Rong Rong <walter...@gmail.com>:
>>>>>>
>>>>>>> Hi Greg.
>>>>>>>
>>>>>>> Based on a quick test I cannot reproduce the issue, it is emitting
>>>>>>> messages correctly in the ITCase environment.
>>>>>>> can you share more information? Does the same problem happen if you
>>>>>>> use proctime?
>>>>>>> I am guessing this could be highly correlated with how you set your
>>>>>>> watermark strategy of your input streams of "user_things" and 
>>>>>>> "user_stuff".
>>>>>>>
>>>>>>> --
>>>>>>> Rong
>>>>>>>
>>>>>>> On Tue, Jun 26, 2018 at 6:37 PM Gregory Fee <g...@lyft.com> wrote:
>>>>>>>
>>>>>>>> Hello User Community!
>>>>>>>>
>>>>>>>> I am running some streaming SQL that involves a union all into an
>>>>>>>> over window similar to the below:
>>>>>>>>
>>>>>>>> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY
>>>>>>>> rowtime RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime
>>>>>>>> FROM
>>>>>>>>     (SELECT rowtime, user_id, thing as action FROM user_things
>>>>>>>>      UNION ALL SELECT rowtime, user_id, stuff as action FROM
>>>>>>>> user_stuff)
>>>>>>>>
>>>>>>>> The SQL generates three operators. There are two operators that
>>>>>>>> process the 'from' part of the clause that feed into an 'over' 
>>>>>>>> operator. I
>>>>>>>> notice that messages flow into the 'over' operator and just buffer 
>>>>>>>> there
>>>>>>>> for a long time (hours in some cases). Eventually something happens 
>>>>>>>> and the
>>>>>>>> data starts to flush through to the downstream operators. Can anyone 
>>>>>>>> help
>>>>>>>> me understand what is causing that behavior? I want the data to flow
>>>>>>>> through more consistently.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> *Gregory Fee*
>>>>>>>> Engineer
>>>>>>>> 425.830.4734 <+14258304734>
>>>>>>>> [image: Lyft] <http://www.lyft.com>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Gregory Fee*
>>>>> Engineer
>>>>> 425.830.4734 <+14258304734>
>>>>> [image: Lyft] <http://www.lyft.com>
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *Gregory Fee*
>>> Engineer
>>> 425.830.4734 <+14258304734>
>>> [image: Lyft] <http://www.lyft.com>
>>>
>>
>>
>


-- 
*Gregory Fee*
Engineer
425.830.4734 <+14258304734>
[image: Lyft] <http://www.lyft.com>

Reply via email to