Hi,

Yes Carst, that’s exactly what happens: 240 get+put calls.

Best,
Aljoscha

> On 24. May 2017, at 15:49, Carst Tankink <ctank...@bol.com> wrote:
> 
> Hi,
>  
> Thanks Aljoshcha!
> To complete my understanding: the problem here is that each element in the 
> sliding window(s) basically triggers 240 get+put calls instead of just 1, 
> right? I can see how that blows up :-) 
> I have a good idea on how to proceed next, so I will be trying out writing 
> the custom ProcessFunction next (week).
>  
> Stefan, in our case we are already on Flink 1.2.1 which should have the 
> patched version of RocksDB, right? Because that patch did solve an issue we 
> had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was 
> stalling quite often under Flink 1.2.0) but did not solve this case, which 
> fits the “way too much RocksDB access” explanation better.
>  
>  
> Thanks again,
> Carst
>  
> From: Aljoscha Krettek <aljos...@apache.org>
> Date: Wednesday, May 24, 2017 at 16:13
> To: Stefan Richter <s.rich...@data-artisans.com>
> Cc: Carst Tankink <ctank...@bol.com>, "user@flink.apache.org" 
> <user@flink.apache.org>
> Subject: Re: large sliding window perf question
>  
> Hi, 
>  
> I’m afraid you’re running into a general shortcoming of the current sliding 
> windows implementation: every sliding window is treated as its own window 
> that has window contents and trigger state/timers. For example, if you have a 
> sliding window of size 4 hours with 1 minute slide this means each element is 
> in 240 windows and you basically amplify writing to RocksDB by 240. This gets 
> out of hand very quickly with larger differences between window side and 
> slide interval.
>  
> I’m also afraid there is no solution for this right now so the workaround 
> Chen mentioned is the way to go right now.
>  
> Best,
> Aljoscha
> On 24. May 2017, at 14:07, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
>  
> Hi, 
>  
> both issues sound like the known problem with RocksDB merging state. Please 
> take a look here
>  
> https://issues.apache.org/jira/browse/FLINK-5756 
> <https://issues.apache.org/jira/browse/FLINK-5756>
>  
> and here
>  
> https://github.com/facebook/rocksdb/issues/1988 
> <https://github.com/facebook/rocksdb/issues/1988>
>  
> Best,
> Stefan
>  
>  
> Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctank...@bol.com 
> <mailto:ctank...@bol.com>>:
>  
> Hi,
>  
> We are seeing a similar behaviour for large sliding windows. Let me put some 
> details here and see if they match up enough with Chen’s:
>  
> Technical specs:
> -          Flink 1.2.1 on YARN
> -          RocksDB backend, on HDFS. I’ve set the backend to 
> PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster 
> runs on spinning disks but that doesn’t seem to help
>  
> Pipeline:
> -          Read from Kafka, extract ids
> -          KeyBy id,  count occurences of each id using a fold. The window 
> size of this operator is 10 minutes with a slide of 1 minute
> -          KeyBy id (again),  compute mean, standard deviation using a fold. 
> The window size of this operator is 4 hours with a slide of 1 minute.
> -          Post-process data, sink.
>  
> What I observe is:
> -          With a heap-based backend, the job runs really quick  (couple of 
> minutes to process 7 days of Kafka data) but eventually goes OOM with a GC 
> overhead exceeded error.
> -          With the RocksDB backend, checkpoints get stuck most of the time, 
> and the “count occurences” step gets a lot of back pressure from the next 
> operator (on the large window)
> o    In those cases the checkpoint does succeed, the state for the large 
> window is around 500-700MB, others states are within the KBs.
> o    Also in those cases, all time seems to be spent in the ‘alignment’ phase 
> for a single subtask of the count operator, with the other operators aligning 
> within milliseconds. The checkpoint duration itself is no more than 2seconds 
> even for the larger states.
>  
>  
> At this point, I’m a bit at a loss to figure out what’s going on. My best 
> guess is it has to do with the state access to the RocksDBFoldingState, but 
> why this so slow is beyond me.
>  
> Hope this info helps in figuring out what is going on, and hopefully it is 
> actually related to Chen’s case :)
>  
>  
> Thanks,
> Carst
>  
> From: Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>>
> Date: Tuesday, May 23, 2017 at 21:35
> To: "user@flink.apache.org <mailto:user@flink.apache.org>" 
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> Subject: Re: large sliding window perf question
>  
> Hi,
>  
> Which state backend and Flink version are you using? There was a problem with 
> large merging states on RocksDB, caused by some inefficiencies in the merge 
> operator of RocksDB. We provide a custom patch for this with all newer 
> versions of Flink. 
>  
> Best,
> Stefan
>  
> Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com 
> <mailto:qinnc...@gmail.com>>:
>  
> Hi there,
>  
> I have seen some weird perf issue while running event time based job with 
> large sliding window (24 hours offset every 10s) 
>  
> pipeline looks simple, 
> tail kafka topic and assign timestamp and watermark, forward to large sliding 
> window (30days) and fire every 10 seconds and print out.
>  
> what I have seen first hand was checkpointing stuck, took longer than timeout 
> despite traffic volume is low ~300 TPS. Looking deeper, it seems back 
> pressure kick in and window operator consumes message really slowly and 
> throttle sources.
>  
> I also tried to limit window time to mins and all issues are gone.
>  
> Any suggestion on this. My work around is I implemented processFunction and 
> keep big value state, periodically evaluate and emit downstream (emulate what 
> sliding window does)
>  
> Thanks,
> Chen
>  
>  
> 
> 
> 
>  
>  

Reply via email to