Thanks for your answers! Yes, it was based on watermarks. Fabian, the state does indeed grow quite a bit in my scenario. I've observed in the range of 5GB. That doesn't seem to be an issue in itself. However, in my scenario I'm loading a lot of data from a historic store that is only partitioned by day. As such a full day's worth of data is loaded into the system before the watermark advances. At that point the checkpoints stall indefinitely with a couple of the tasks in the 'over' operator never acknowledging. Any thoughts on what would cause that? Or how to address it?
On Wed, Jun 27, 2018 at 2:20 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > The OVER window operator can only emit result when the watermark is > advanced, due to SQL semantics which define that all records with the same > timestamp need to be processed together. > Can you check if the watermarks make sufficient progress? > > Btw. did you observe state size or IO issues? The OVER window operator > also needs to store the whole window interval in state, i.e., 14 days in > your case, in order to be able to retract the data from the aggregates > after 14 days. > Everytime the watermark moves, the operator iterates over all timestamps > (per key) to check which records need to be removed. > > Best, Fabian > > 2018-06-27 5:38 GMT+02:00 Rong Rong <walter...@gmail.com>: > >> Hi Greg. >> >> Based on a quick test I cannot reproduce the issue, it is emitting >> messages correctly in the ITCase environment. >> can you share more information? Does the same problem happen if you use >> proctime? >> I am guessing this could be highly correlated with how you set your >> watermark strategy of your input streams of "user_things" and "user_stuff". >> >> -- >> Rong >> >> On Tue, Jun 26, 2018 at 6:37 PM Gregory Fee <g...@lyft.com> wrote: >> >>> Hello User Community! >>> >>> I am running some streaming SQL that involves a union all into an over >>> window similar to the below: >>> >>> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY >>> rowtime RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime >>> FROM >>> (SELECT rowtime, user_id, thing as action FROM user_things >>> UNION ALL SELECT rowtime, user_id, stuff as action FROM user_stuff) >>> >>> The SQL generates three operators. There are two operators that process >>> the 'from' part of the clause that feed into an 'over' operator. I notice >>> that messages flow into the 'over' operator and just buffer there for a >>> long time (hours in some cases). Eventually something happens and the data >>> starts to flush through to the downstream operators. Can anyone help me >>> understand what is causing that behavior? I want the data to flow through >>> more consistently. >>> >>> Thanks! >>> >>> >>> >>> -- >>> *Gregory Fee* >>> Engineer >>> 425.830.4734 <+14258304734> >>> [image: Lyft] <http://www.lyft.com> >>> >> > -- *Gregory Fee* Engineer 425.830.4734 <+14258304734> [image: Lyft] <http://www.lyft.com>