Hi, Yes Carst, that’s exactly what happens: 240 get+put calls.
Best, Aljoscha > On 24. May 2017, at 15:49, Carst Tankink <ctank...@bol.com> wrote: > > Hi, > > Thanks Aljoshcha! > To complete my understanding: the problem here is that each element in the > sliding window(s) basically triggers 240 get+put calls instead of just 1, > right? I can see how that blows up :-) > I have a good idea on how to proceed next, so I will be trying out writing > the custom ProcessFunction next (week). > > Stefan, in our case we are already on Flink 1.2.1 which should have the > patched version of RocksDB, right? Because that patch did solve an issue we > had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was > stalling quite often under Flink 1.2.0) but did not solve this case, which > fits the “way too much RocksDB access” explanation better. > > > Thanks again, > Carst > > From: Aljoscha Krettek <aljos...@apache.org> > Date: Wednesday, May 24, 2017 at 16:13 > To: Stefan Richter <s.rich...@data-artisans.com> > Cc: Carst Tankink <ctank...@bol.com>, "user@flink.apache.org" > <user@flink.apache.org> > Subject: Re: large sliding window perf question > > Hi, > > I’m afraid you’re running into a general shortcoming of the current sliding > windows implementation: every sliding window is treated as its own window > that has window contents and trigger state/timers. For example, if you have a > sliding window of size 4 hours with 1 minute slide this means each element is > in 240 windows and you basically amplify writing to RocksDB by 240. This gets > out of hand very quickly with larger differences between window side and > slide interval. > > I’m also afraid there is no solution for this right now so the workaround > Chen mentioned is the way to go right now. > > Best, > Aljoscha > On 24. May 2017, at 14:07, Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>> wrote: > > Hi, > > both issues sound like the known problem with RocksDB merging state. Please > take a look here > > https://issues.apache.org/jira/browse/FLINK-5756 > <https://issues.apache.org/jira/browse/FLINK-5756> > > and here > > https://github.com/facebook/rocksdb/issues/1988 > <https://github.com/facebook/rocksdb/issues/1988> > > Best, > Stefan > > > Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctank...@bol.com > <mailto:ctank...@bol.com>>: > > Hi, > > We are seeing a similar behaviour for large sliding windows. Let me put some > details here and see if they match up enough with Chen’s: > > Technical specs: > - Flink 1.2.1 on YARN > - RocksDB backend, on HDFS. I’ve set the backend to > PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster > runs on spinning disks but that doesn’t seem to help > > Pipeline: > - Read from Kafka, extract ids > - KeyBy id, count occurences of each id using a fold. The window > size of this operator is 10 minutes with a slide of 1 minute > - KeyBy id (again), compute mean, standard deviation using a fold. > The window size of this operator is 4 hours with a slide of 1 minute. > - Post-process data, sink. > > What I observe is: > - With a heap-based backend, the job runs really quick (couple of > minutes to process 7 days of Kafka data) but eventually goes OOM with a GC > overhead exceeded error. > - With the RocksDB backend, checkpoints get stuck most of the time, > and the “count occurences” step gets a lot of back pressure from the next > operator (on the large window) > o In those cases the checkpoint does succeed, the state for the large > window is around 500-700MB, others states are within the KBs. > o Also in those cases, all time seems to be spent in the ‘alignment’ phase > for a single subtask of the count operator, with the other operators aligning > within milliseconds. The checkpoint duration itself is no more than 2seconds > even for the larger states. > > > At this point, I’m a bit at a loss to figure out what’s going on. My best > guess is it has to do with the state access to the RocksDBFoldingState, but > why this so slow is beyond me. > > Hope this info helps in figuring out what is going on, and hopefully it is > actually related to Chen’s case :) > > > Thanks, > Carst > > From: Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>> > Date: Tuesday, May 23, 2017 at 21:35 > To: "user@flink.apache.org <mailto:user@flink.apache.org>" > <user@flink.apache.org <mailto:user@flink.apache.org>> > Subject: Re: large sliding window perf question > > Hi, > > Which state backend and Flink version are you using? There was a problem with > large merging states on RocksDB, caused by some inefficiencies in the merge > operator of RocksDB. We provide a custom patch for this with all newer > versions of Flink. > > Best, > Stefan > > Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com > <mailto:qinnc...@gmail.com>>: > > Hi there, > > I have seen some weird perf issue while running event time based job with > large sliding window (24 hours offset every 10s) > > pipeline looks simple, > tail kafka topic and assign timestamp and watermark, forward to large sliding > window (30days) and fire every 10 seconds and print out. > > what I have seen first hand was checkpointing stuck, took longer than timeout > despite traffic volume is low ~300 TPS. Looking deeper, it seems back > pressure kick in and window operator consumes message really slowly and > throttle sources. > > I also tried to limit window time to mins and all issues are gone. > > Any suggestion on this. My work around is I implemented processFunction and > keep big value state, periodically evaluate and emit downstream (emulate what > sliding window does) > > Thanks, > Chen > > > > > > >