Hi, both issues sound like the known problem with RocksDB merging state. Please take a look here
https://issues.apache.org/jira/browse/FLINK-5756 <https://issues.apache.org/jira/browse/FLINK-5756> and here https://github.com/facebook/rocksdb/issues/1988 <https://github.com/facebook/rocksdb/issues/1988> Best, Stefan > Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctank...@bol.com>: > > Hi, > > We are seeing a similar behaviour for large sliding windows. Let me put some > details here and see if they match up enough with Chen’s: > > Technical specs: > - Flink 1.2.1 on YARN > - RocksDB backend, on HDFS. I’ve set the backend to > PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster > runs on spinning disks but that doesn’t seem to help > > Pipeline: > - Read from Kafka, extract ids > - KeyBy id, count occurences of each id using a fold. The window > size of this operator is 10 minutes with a slide of 1 minute > - KeyBy id (again), compute mean, standard deviation using a fold. > The window size of this operator is 4 hours with a slide of 1 minute. > - Post-process data, sink. > > What I observe is: > - With a heap-based backend, the job runs really quick (couple of > minutes to process 7 days of Kafka data) but eventually goes OOM with a GC > overhead exceeded error. > - With the RocksDB backend, checkpoints get stuck most of the time, > and the “count occurences” step gets a lot of back pressure from the next > operator (on the large window) > o In those cases the checkpoint does succeed, the state for the large > window is around 500-700MB, others states are within the KBs. > o Also in those cases, all time seems to be spent in the ‘alignment’ phase > for a single subtask of the count operator, with the other operators aligning > within milliseconds. The checkpoint duration itself is no more than 2seconds > even for the larger states. > > > At this point, I’m a bit at a loss to figure out what’s going on. My best > guess is it has to do with the state access to the RocksDBFoldingState, but > why this so slow is beyond me. > > Hope this info helps in figuring out what is going on, and hopefully it is > actually related to Chen’s case :) > > > Thanks, > Carst > > From: Stefan Richter <s.rich...@data-artisans.com> > Date: Tuesday, May 23, 2017 at 21:35 > To: "user@flink.apache.org" <user@flink.apache.org> > Subject: Re: large sliding window perf question > > Hi, > > Which state backend and Flink version are you using? There was a problem with > large merging states on RocksDB, caused by some inefficiencies in the merge > operator of RocksDB. We provide a custom patch for this with all newer > versions of Flink. > > Best, > Stefan > > Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com > <mailto:qinnc...@gmail.com>>: > > Hi there, > > I have seen some weird perf issue while running event time based job with > large sliding window (24 hours offset every 10s) > > pipeline looks simple, > tail kafka topic and assign timestamp and watermark, forward to large sliding > window (30days) and fire every 10 seconds and print out. > > what I have seen first hand was checkpointing stuck, took longer than timeout > despite traffic volume is low ~300 TPS. Looking deeper, it seems back > pressure kick in and window operator consumes message really slowly and > throttle sources. > > I also tried to limit window time to mins and all issues are gone. > > Any suggestion on this. My work around is I implemented processFunction and > keep big value state, periodically evaluate and emit downstream (emulate what > sliding window does) > > Thanks, > Chen > > > >