Hi Chen, How to you update the ValueState during checkpointing. I’m asking because a keyed state should always be scoped to a key and when checkpointing there is no key scope because we are not processing any incoming element and we’re not firing a timer (the two cases where we have a key scope).
Best, Aljoscha > On 24. May 2017, at 21:05, Chen Qin <qinnc...@gmail.com> wrote: > > Got it! Looks like 30days window and trigger 10seconds is way too many > (quarter million every 10 seconds per key, around 150 keys). > > Just to add some background, I tried three ways to implement this large > sliding window pipeline, all share same configuration and use rocksdb > statebackend remote to s3 > out of box sliding window 30days 10s trigger > processfunction with list state > process function with in memory cache, update valuestate during checkpoint, > filter & emits list of events periodically. Value state checkpoint as blob > seems complete quickly. > First two options see perf issue, third one so far works fine. > > Thanks, > Chen > > On Wed, May 24, 2017 at 8:24 AM, Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>> wrote: > Yes Cast, I noticed your version is already 1.2.1, which is why I contacted > Aljoscha to take a look here because he knows best about the expected > scalability of the sliding window implementation. > >> Am 24.05.2017 um 16:49 schrieb Carst Tankink <ctank...@bol.com >> <mailto:ctank...@bol.com>>: >> >> Hi, >> >> Thanks Aljoshcha! >> To complete my understanding: the problem here is that each element in the >> sliding window(s) basically triggers 240 get+put calls instead of just 1, >> right? I can see how that blows up :-) >> I have a good idea on how to proceed next, so I will be trying out writing >> the custom ProcessFunction next (week). >> >> Stefan, in our case we are already on Flink 1.2.1 which should have the >> patched version of RocksDB, right? Because that patch did solve an issue we >> had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which >> was stalling quite often under Flink 1.2.0) but did not solve this case, >> which fits the “way too much RocksDB access” explanation better. >> >> >> Thanks again, >> Carst >> >> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> >> Date: Wednesday, May 24, 2017 at 16:13 >> To: Stefan Richter <s.rich...@data-artisans.com >> <mailto:s.rich...@data-artisans.com>> >> Cc: Carst Tankink <ctank...@bol.com <mailto:ctank...@bol.com>>, >> "user@flink.apache.org <mailto:user@flink.apache.org>" >> <user@flink.apache.org <mailto:user@flink.apache.org>> >> Subject: Re: large sliding window perf question >> >> Hi, >> >> I’m afraid you’re running into a general shortcoming of the current sliding >> windows implementation: every sliding window is treated as its own window >> that has window contents and trigger state/timers. For example, if you have >> a sliding window of size 4 hours with 1 minute slide this means each element >> is in 240 windows and you basically amplify writing to RocksDB by 240. This >> gets out of hand very quickly with larger differences between window side >> and slide interval. >> >> I’m also afraid there is no solution for this right now so the workaround >> Chen mentioned is the way to go right now. >> >> Best, >> Aljoscha >> On 24. May 2017, at 14:07, Stefan Richter <s.rich...@data-artisans.com >> <mailto:s.rich...@data-artisans.com>> wrote: >> >> Hi, >> >> both issues sound like the known problem with RocksDB merging state. Please >> take a look here >> >> https://issues.apache.org/jira/browse/FLINK-5756 >> <https://issues.apache.org/jira/browse/FLINK-5756> >> >> and here >> >> https://github.com/facebook/rocksdb/issues/1988 >> <https://github.com/facebook/rocksdb/issues/1988> >> >> Best, >> Stefan >> >> >> Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctank...@bol.com >> <mailto:ctank...@bol.com>>: >> >> Hi, >> >> We are seeing a similar behaviour for large sliding windows. Let me put some >> details here and see if they match up enough with Chen’s: >> >> Technical specs: >> - Flink 1.2.1 on YARN >> - RocksDB backend, on HDFS. I’ve set the backend to >> PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster >> runs on spinning disks but that doesn’t seem to help >> >> Pipeline: >> - Read from Kafka, extract ids >> - KeyBy id, count occurences of each id using a fold. The window >> size of this operator is 10 minutes with a slide of 1 minute >> - KeyBy id (again), compute mean, standard deviation using a fold. >> The window size of this operator is 4 hours with a slide of 1 minute. >> - Post-process data, sink. >> >> What I observe is: >> - With a heap-based backend, the job runs really quick (couple of >> minutes to process 7 days of Kafka data) but eventually goes OOM with a GC >> overhead exceeded error. >> - With the RocksDB backend, checkpoints get stuck most of the time, >> and the “count occurences” step gets a lot of back pressure from the next >> operator (on the large window) >> o In those cases the checkpoint does succeed, the state for the large >> window is around 500-700MB, others states are within the KBs. >> o Also in those cases, all time seems to be spent in the ‘alignment’ >> phase for a single subtask of the count operator, with the other operators >> aligning within milliseconds. The checkpoint duration itself is no more than >> 2seconds even for the larger states. >> >> >> At this point, I’m a bit at a loss to figure out what’s going on. My best >> guess is it has to do with the state access to the RocksDBFoldingState, but >> why this so slow is beyond me. >> >> Hope this info helps in figuring out what is going on, and hopefully it is >> actually related to Chen’s case :) >> >> >> Thanks, >> Carst >> >> From: Stefan Richter <s.rich...@data-artisans.com >> <mailto:s.rich...@data-artisans.com>> >> Date: Tuesday, May 23, 2017 at 21:35 >> To: "user@flink.apache.org <mailto:user@flink.apache.org>" >> <user@flink.apache.org <mailto:user@flink.apache.org>> >> Subject: Re: large sliding window perf question >> >> Hi, >> >> Which state backend and Flink version are you using? There was a problem >> with large merging states on RocksDB, caused by some inefficiencies in the >> merge operator of RocksDB. We provide a custom patch for this with all newer >> versions of Flink. >> >> Best, >> Stefan >> >> Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com >> <mailto:qinnc...@gmail.com>>: >> >> Hi there, >> >> I have seen some weird perf issue while running event time based job with >> large sliding window (24 hours offset every 10s) >> >> pipeline looks simple, >> tail kafka topic and assign timestamp and watermark, forward to large >> sliding window (30days) and fire every 10 seconds and print out. >> >> what I have seen first hand was checkpointing stuck, took longer than >> timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems >> back pressure kick in and window operator consumes message really slowly and >> throttle sources. >> >> I also tried to limit window time to mins and all issues are gone. >> >> Any suggestion on this. My work around is I implemented processFunction and >> keep big value state, periodically evaluate and emit downstream (emulate >> what sliding window does) >> >> Thanks, >> Chen >> >> >> >> >> >> >> > >