Sorry for my late answer Bowen, I think this only works if you implement your own WindowAssigner. With the built-in sliding window this is not possible since all windows have the same offset.
Cheers, Till On Fri, Aug 25, 2017 at 9:44 AM, Bowen Li <bowen...@offerupnow.com> wrote: > Hi Till, > What I mean is: can the sliding windows for different item have different > start time? > > Here's an example of what we want: > - for item A: its first event arrives at 2017/8/24-01:*12:24*, so the 1st > window should be 2017/8/24-01:*12:24* - 2017/8/25-01:*12:23*, the 2nd > window would be 2017/8/24-02:*12:24* - 2017/8/25-02:*12:23*, and so on > - for item B: its first event arrives at 2017/8/24-01:*10:20*, so the 1st > window should be 2017/8/24-01:*10:20* - 2017/8/25-01:*10:19*, the 2nd > window would be 2017/8/24-02:*10:20* - 2017/8/25-02:*10:19*, and so on. > > But we observed that what Flink does is: for both A and B, their own > unique time offset within an hour (*12:24 and 10:20*) are eliminated by > Flink, and windows are unified to be like 2017/8/24-01:*00:00* - > 2017/8/25-01:*00:00*, 2017/8/24-02:*00:00* - 2017/8/25-02:*00:00*, and so > on. > > Unifying the starting time of windows for all items brings us trouble. It > means 20million windows are triggered and fired at same time, and the > downstream Kinesis sink cannot handle the amount of output. We actually > want windows for different items to be triggered and fired at different > time within an hour, so we can even out the amount of output to downstream > Kinesis sink, as my ASCII charts demonstrated. > > Does my example make sense? > > Thanks, > Bowen > > On Fri, Aug 25, 2017 at 12:01 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Bowen, >> >> having a sliding window of one day with a slide of one hour basically >> means that each window is closed after 24 hours and the next closing >> happens one hour later. Only when the window is closed/triggered, you >> compute the window function which generates the window output. That's why >> you see the spikes in your load and it's basically caused by the program >> semantics. >> >> What do you mean by burning down the underlying KPL? If KPL has a max >> throughput, then the FlinkKinesisProducer should ideally respect that. >> >> nice ASCII art btw :-) >> >> Cheers, >> Till >> >> On Fri, Aug 25, 2017 at 6:20 AM, Bowen Li <bowen...@offerupnow.com> >> wrote: >> >>> Hi Till, >>> >>> Thank you very much for looking into it! According to our investigation, >>> this is indeed a Kinesis issue. Flink (FlinkKinesisProducer) uses >>> KPL(Kinesis Producer Library), but hasn't tune it up yet. I have identified >>> a bunch of issues, opened the following Flink tickets, and are working on >>> them. >>> >>> >>> - [FLINK-7367][kinesis connector] Parameterize more configs for >>> FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, >>> RequestTimeout, etc) >>> - [FLINK-7366][kinesis connector] Upgrade kinesis producer library >>> in flink-connector-kinesis >>> - [FLINK-7508] switch FlinkKinesisProducer to use KPL's >>> ThreadingMode to ThreadedPool mode rather than Per_Request mode >>> >>> >>> I do have a question for Flink performance. We are using a 1-day >>> sized sliding window with 1-hour slide to count some features of items >>> based on event time. We have about 20million items. We observed that Flink >>> only emit results on a fixed time in an hour (e.g. 1am, 2am, 3am, or >>> 1:15am, 2:15am, 3:15am with a 15min offset). That's means 20million >>> windows/records are generated at the same time every hour, which burns down >>> FlinkKinesisProducer and the underlying KPL, but nothing is generated in >>> the rest of that hour. The pattern is like this: >>> >>> load >>> | >>> | /\ /\ >>> | / \ / \ >>> |_/_ \_______/__\_ >>> time >>> >>> Is there any way to even out the number of generated windows/records in >>> an hour? Can we have evenly distributed generated load like this? >>> >>> load >>> | >>> | >>> | ------------------------ >>> |_______________ >>> time >>> >>> >>> Thanks, >>> Bowen >>> >>> On Tue, Aug 22, 2017 at 2:56 AM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Bowen, >>>> >>>> sorry for my late answer. I dug through some of the logs and it seems >>>> that you have the following problem: >>>> >>>> 1. >>>> >>>> Once in a while the Kinesis producer fails with a >>>> UserRecordFailedException saying “Expired while waiting in HttpClient >>>> queue >>>> Record has reached expiration”. This seems to be a problem on the >>>> Kinesis >>>> side. This will trigger the task failure and the cancellation of all >>>> other >>>> tasks as well. >>>> 2. >>>> >>>> Somehow Flink does not manage to cancel all tasks within a period >>>> of 180 seconds. This value is configurable via >>>> task.cancellation.timeout (unit ms) via the Flink configuration. It >>>> looks a bit like you have a lot of logging going on, because the the >>>> code >>>> is waiting for example on Category.java:204 and other log4j methods. >>>> This >>>> could, however also cover the true issue. What you could do is to try >>>> out a >>>> different logging backend such as logback [1], for example. >>>> 3. >>>> >>>> The failing cancellation is a fatal error which leads to the >>>> termination of the TaskManager. This will be notified by the >>>> YarnResourceManager and it will restart the container. This goes on >>>> until >>>> it reaches the number of maximum failed containers. This value can be >>>> configured via yarn.maximum-failed-containers. Per default it is >>>> the number of initial containers you requested. If you set this value to >>>> -1, then it will never fail and always restart failed containers. >>>> Once the maximum is reached, Flink terminates the Yarn application. >>>> >>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>> dev/best_practices.html#using-logback-instead-of-log4j >>>> >>>> In order to further debug the problem, which version of Flink are you >>>> using and maybe you could provide us with the debug log level logs of the >>>> TaskManagers. >>>> >>>> Cheers, >>>> Till >>>> >>>> >>>> On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li <bowen...@offerupnow.com> >>>> wrote: >>>> >>>>> Hi Till, >>>>> Any idea why it happened? I've tried different configurations for >>>>> configuring our Flink cluster, but the cluster always fails after 4 or 5 >>>>> hours. >>>>> >>>>> According to the log, looks like the total number of slots becomes >>>>> 0 at the end, and YarnClusterClient shuts down application master as >>>>> a result. Why the slots are not released? Or are they actually >>>>> crushed and thus no longer available? >>>>> >>>>> I'm trying to deploy the first Flink cluster within out company. And >>>>> this issue is slowing us down from proving that Flink actually works for >>>>> us. We'd appreciate your help on it! >>>>> >>>>> Thanks, >>>>> Bowen >>>>> >>>>> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bowen...@offerupnow.com> >>>>> wrote: >>>>> >>>>>> Hi Till, >>>>>> Thanks for taking this issue. >>>>>> >>>>>> We are not comfortable sending logs to a email list which is this >>>>>> open. I'll send logs to you. >>>>>> >>>>>> Thanks, >>>>>> Bowen >>>>>> >>>>>> >>>>>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Bowen, >>>>>>> >>>>>>> if I'm not mistaken, then Flink's current Yarn implementation does >>>>>>> not actively releases containers. The `YarnFlinkResourceManager` is >>>>>>> started >>>>>>> with a fixed number of containers it always tries to acquire. If a >>>>>>> container should die, then it will request a new one. >>>>>>> >>>>>>> In case of a failure all slots should be freed and then they should >>>>>>> be subject to rescheduling the new tasks. Thus, it is not necessarily >>>>>>> the >>>>>>> case that 12 new slots will be used unless the old slots are no longer >>>>>>> available (failure of a TM). Therefore, it sounds like a bug what you >>>>>>> are >>>>>>> describing. Could you share the logs with us? >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bowen...@offerupnow.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi guys, >>>>>>>> I was running a Flink job (12 parallelism) on an EMR cluster >>>>>>>> with 48 YARN slots. When the job starts, I can see from Flink UI that >>>>>>>> the >>>>>>>> job took 12 slots, and 36 slots were left available. >>>>>>>> >>>>>>>> I would expect that when the job fails, it would restart from >>>>>>>> checkpointing by taking another 12 slots and freeing the original 12 >>>>>>>> slots. *Well, >>>>>>>> I observed that the job took new slots but never free original slots. >>>>>>>> The >>>>>>>> Flink job ended up killed by YARN because there's no available slots >>>>>>>> anymore.* >>>>>>>> >>>>>>>> Here's the command I ran Flink job: >>>>>>>> >>>>>>>> ``` >>>>>>>> flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000 xxx.jar >>>>>>>> ``` >>>>>>>> >>>>>>>> Does anyone know what's going wrong? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Bowen >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >