Thanks! I'm working on a way to deliver the data in order (or closer to in order) and deliver watermarks more often. I'll let you know my results.
On Thu, Jun 28, 2018 at 5:36 AM, Fabian Hueske <fhue...@gmail.com> wrote: > In a nutshell the Over operator works as follows: > - When a row arrives it is put into a MapState keyed on its timestamp and > a timer is registered to process it when the watermark passes that > timestamp. > - All the heavy computation is done in the onTimer() method. For each > unique timestamp, the Over operator iterates once over all records in the > MapState to retract and purge expired rows from and accumulate the new rows > to the aggregation result. > > In Gregory's use case, many rows are added before the watermark advances. > Once that happens, the operator becomes very busy and iterates many times > over the state to retract and accumulate rows. During that time, the input > stream cannot be consumed, hence checkpoints stall. > > The solution that seems to work is to increase the watermark interval. > However, we could also think about improving the implementation to reduce > the number of state iterations. A time-sorted state primitive would make > that much easier. > > Best, Fabian > > 2018-06-28 6:41 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>: > >> Hi Gregory, >> >> What's the cause of your problem. It would be great if you can share your >> experience which I think will definitely help others. >> >> >> On Thu, Jun 28, 2018 at 11:30 AM, Gregory Fee <g...@lyft.com> wrote: >> >>> Yep, it was definitely a watermarking issue. I have that sorted out now. >>> Thanks! >>> >>> On Wed, Jun 27, 2018 at 6:49 PM, Hequn Cheng <chenghe...@gmail.com> >>> wrote: >>> >>>> Hi Gregory, >>>> >>>> As you are using the rowtime over window. It is probably a watermark >>>> problem. The window only output when watermarks make a progress. You >>>> can use processing-time(instead of row-time) to verify the assumption. >>>> Also, make sure there are data in each of you source partition, the >>>> watermarks make no progress if one of the source partition has no data. An >>>> operator’s current event time is the minimum of its input streams’ event >>>> times[1]. >>>> >>>> Best, Hequn >>>> >>>> [1]: https://ci.apache.org/projects/flink/flink-docs-master/ >>>> dev/event_time.html >>>> >>>> On Thu, Jun 28, 2018 at 1:58 AM, Gregory Fee <g...@lyft.com> wrote: >>>> >>>>> Thanks for your answers! Yes, it was based on watermarks. >>>>> >>>>> Fabian, the state does indeed grow quite a bit in my scenario. I've >>>>> observed in the range of 5GB. That doesn't seem to be an issue in itself. >>>>> However, in my scenario I'm loading a lot of data from a historic store >>>>> that is only partitioned by day. As such a full day's worth of data is >>>>> loaded into the system before the watermark advances. At that point the >>>>> checkpoints stall indefinitely with a couple of the tasks in the 'over' >>>>> operator never acknowledging. Any thoughts on what would cause that? Or >>>>> how >>>>> to address it? >>>>> >>>>> On Wed, Jun 27, 2018 at 2:20 AM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> The OVER window operator can only emit result when the watermark is >>>>>> advanced, due to SQL semantics which define that all records with the >>>>>> same >>>>>> timestamp need to be processed together. >>>>>> Can you check if the watermarks make sufficient progress? >>>>>> >>>>>> Btw. did you observe state size or IO issues? The OVER window >>>>>> operator also needs to store the whole window interval in state, i.e., 14 >>>>>> days in your case, in order to be able to retract the data from the >>>>>> aggregates after 14 days. >>>>>> Everytime the watermark moves, the operator iterates over all >>>>>> timestamps (per key) to check which records need to be removed. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2018-06-27 5:38 GMT+02:00 Rong Rong <walter...@gmail.com>: >>>>>> >>>>>>> Hi Greg. >>>>>>> >>>>>>> Based on a quick test I cannot reproduce the issue, it is emitting >>>>>>> messages correctly in the ITCase environment. >>>>>>> can you share more information? Does the same problem happen if you >>>>>>> use proctime? >>>>>>> I am guessing this could be highly correlated with how you set your >>>>>>> watermark strategy of your input streams of "user_things" and >>>>>>> "user_stuff". >>>>>>> >>>>>>> -- >>>>>>> Rong >>>>>>> >>>>>>> On Tue, Jun 26, 2018 at 6:37 PM Gregory Fee <g...@lyft.com> wrote: >>>>>>> >>>>>>>> Hello User Community! >>>>>>>> >>>>>>>> I am running some streaming SQL that involves a union all into an >>>>>>>> over window similar to the below: >>>>>>>> >>>>>>>> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY >>>>>>>> rowtime RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime >>>>>>>> FROM >>>>>>>> (SELECT rowtime, user_id, thing as action FROM user_things >>>>>>>> UNION ALL SELECT rowtime, user_id, stuff as action FROM >>>>>>>> user_stuff) >>>>>>>> >>>>>>>> The SQL generates three operators. There are two operators that >>>>>>>> process the 'from' part of the clause that feed into an 'over' >>>>>>>> operator. I >>>>>>>> notice that messages flow into the 'over' operator and just buffer >>>>>>>> there >>>>>>>> for a long time (hours in some cases). Eventually something happens >>>>>>>> and the >>>>>>>> data starts to flush through to the downstream operators. Can anyone >>>>>>>> help >>>>>>>> me understand what is causing that behavior? I want the data to flow >>>>>>>> through more consistently. >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> *Gregory Fee* >>>>>>>> Engineer >>>>>>>> 425.830.4734 <+14258304734> >>>>>>>> [image: Lyft] <http://www.lyft.com> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> *Gregory Fee* >>>>> Engineer >>>>> 425.830.4734 <+14258304734> >>>>> [image: Lyft] <http://www.lyft.com> >>>>> >>>> >>>> >>> >>> >>> -- >>> *Gregory Fee* >>> Engineer >>> 425.830.4734 <+14258304734> >>> [image: Lyft] <http://www.lyft.com> >>> >> >> > -- *Gregory Fee* Engineer 425.830.4734 <+14258304734> [image: Lyft] <http://www.lyft.com>