I'm writing a custom S3 source in order to work around some issues with back pressure and checkpointing at scale in my bootstrap logic. I moved around the logic to assign timestamps and watermarks. As part of that I ended up generating watermarks earlier in the pipeline but having another operator that ended up stripping off all watermarks.
On Wed, Jun 27, 2018 at 9:41 PM, Hequn Cheng <chenghe...@gmail.com> wrote: > Hi Gregory, > > What's the cause of your problem. It would be great if you can share your > experience which I think will definitely help others. > > > On Thu, Jun 28, 2018 at 11:30 AM, Gregory Fee <g...@lyft.com> wrote: > >> Yep, it was definitely a watermarking issue. I have that sorted out now. >> Thanks! >> >> On Wed, Jun 27, 2018 at 6:49 PM, Hequn Cheng <chenghe...@gmail.com> >> wrote: >> >>> Hi Gregory, >>> >>> As you are using the rowtime over window. It is probably a watermark >>> problem. The window only output when watermarks make a progress. You >>> can use processing-time(instead of row-time) to verify the assumption. >>> Also, make sure there are data in each of you source partition, the >>> watermarks make no progress if one of the source partition has no data. An >>> operator’s current event time is the minimum of its input streams’ event >>> times[1]. >>> >>> Best, Hequn >>> >>> [1]: https://ci.apache.org/projects/flink/flink-docs-master/ >>> dev/event_time.html >>> >>> On Thu, Jun 28, 2018 at 1:58 AM, Gregory Fee <g...@lyft.com> wrote: >>> >>>> Thanks for your answers! Yes, it was based on watermarks. >>>> >>>> Fabian, the state does indeed grow quite a bit in my scenario. I've >>>> observed in the range of 5GB. That doesn't seem to be an issue in itself. >>>> However, in my scenario I'm loading a lot of data from a historic store >>>> that is only partitioned by day. As such a full day's worth of data is >>>> loaded into the system before the watermark advances. At that point the >>>> checkpoints stall indefinitely with a couple of the tasks in the 'over' >>>> operator never acknowledging. Any thoughts on what would cause that? Or how >>>> to address it? >>>> >>>> On Wed, Jun 27, 2018 at 2:20 AM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> The OVER window operator can only emit result when the watermark is >>>>> advanced, due to SQL semantics which define that all records with the same >>>>> timestamp need to be processed together. >>>>> Can you check if the watermarks make sufficient progress? >>>>> >>>>> Btw. did you observe state size or IO issues? The OVER window operator >>>>> also needs to store the whole window interval in state, i.e., 14 days in >>>>> your case, in order to be able to retract the data from the aggregates >>>>> after 14 days. >>>>> Everytime the watermark moves, the operator iterates over all >>>>> timestamps (per key) to check which records need to be removed. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2018-06-27 5:38 GMT+02:00 Rong Rong <walter...@gmail.com>: >>>>> >>>>>> Hi Greg. >>>>>> >>>>>> Based on a quick test I cannot reproduce the issue, it is emitting >>>>>> messages correctly in the ITCase environment. >>>>>> can you share more information? Does the same problem happen if you >>>>>> use proctime? >>>>>> I am guessing this could be highly correlated with how you set your >>>>>> watermark strategy of your input streams of "user_things" and >>>>>> "user_stuff". >>>>>> >>>>>> -- >>>>>> Rong >>>>>> >>>>>> On Tue, Jun 26, 2018 at 6:37 PM Gregory Fee <g...@lyft.com> wrote: >>>>>> >>>>>>> Hello User Community! >>>>>>> >>>>>>> I am running some streaming SQL that involves a union all into an >>>>>>> over window similar to the below: >>>>>>> >>>>>>> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY >>>>>>> rowtime RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime >>>>>>> FROM >>>>>>> (SELECT rowtime, user_id, thing as action FROM user_things >>>>>>> UNION ALL SELECT rowtime, user_id, stuff as action FROM >>>>>>> user_stuff) >>>>>>> >>>>>>> The SQL generates three operators. There are two operators that >>>>>>> process the 'from' part of the clause that feed into an 'over' >>>>>>> operator. I >>>>>>> notice that messages flow into the 'over' operator and just buffer there >>>>>>> for a long time (hours in some cases). Eventually something happens and >>>>>>> the >>>>>>> data starts to flush through to the downstream operators. Can anyone >>>>>>> help >>>>>>> me understand what is causing that behavior? I want the data to flow >>>>>>> through more consistently. >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> *Gregory Fee* >>>>>>> Engineer >>>>>>> 425.830.4734 <+14258304734> >>>>>>> [image: Lyft] <http://www.lyft.com> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> *Gregory Fee* >>>> Engineer >>>> 425.830.4734 <+14258304734> >>>> [image: Lyft] <http://www.lyft.com> >>>> >>> >>> >> >> >> -- >> *Gregory Fee* >> Engineer >> 425.830.4734 <+14258304734> >> [image: Lyft] <http://www.lyft.com> >> > > -- *Gregory Fee* Engineer 425.830.4734 <+14258304734> [image: Lyft] <http://www.lyft.com>