Hi Chen,

How to you update the ValueState during checkpointing. I’m asking because a 
keyed state should always be scoped to a key and when checkpointing there is no 
key scope because we are not processing any incoming element and we’re not 
firing a timer (the two cases where we have a key scope).

Best,
Aljoscha

> On 24. May 2017, at 21:05, Chen Qin <qinnc...@gmail.com> wrote:
> 
> Got it! Looks like 30days window and trigger 10seconds is way too many 
> (quarter million every 10 seconds per key, around 150 keys). 
> 
> Just to add some background, I tried three ways to implement this large 
> sliding window pipeline, all share same configuration and use rocksdb 
> statebackend remote to s3
> out of box sliding window 30days 10s trigger
> processfunction with list state
> process function with in memory cache, update valuestate during checkpoint, 
> filter & emits list of events periodically. Value state checkpoint as blob 
> seems complete quickly.
> First two options see perf issue, third one so far works fine.
> 
> Thanks,
> Chen
> 
> On Wed, May 24, 2017 at 8:24 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Yes Cast, I noticed your version is already 1.2.1, which is why I contacted 
> Aljoscha to take a look here because he knows best about the expected 
> scalability of the sliding window implementation.
>  
>> Am 24.05.2017 um 16:49 schrieb Carst Tankink <ctank...@bol.com 
>> <mailto:ctank...@bol.com>>:
>> 
>> Hi,
>>  
>> Thanks Aljoshcha!
>> To complete my understanding: the problem here is that each element in the 
>> sliding window(s) basically triggers 240 get+put calls instead of just 1, 
>> right? I can see how that blows up :-) 
>> I have a good idea on how to proceed next, so I will be trying out writing 
>> the custom ProcessFunction next (week).
>>  
>> Stefan, in our case we are already on Flink 1.2.1 which should have the 
>> patched version of RocksDB, right? Because that patch did solve an issue we 
>> had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which 
>> was stalling quite often under Flink 1.2.0) but did not solve this case, 
>> which fits the “way too much RocksDB access” explanation better.
>>  
>>  
>> Thanks again,
>> Carst
>>  
>> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>>
>> Date: Wednesday, May 24, 2017 at 16:13
>> To: Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>>
>> Cc: Carst Tankink <ctank...@bol.com <mailto:ctank...@bol.com>>, 
>> "user@flink.apache.org <mailto:user@flink.apache.org>" 
>> <user@flink.apache.org <mailto:user@flink.apache.org>>
>> Subject: Re: large sliding window perf question
>>  
>> Hi, 
>>  
>> I’m afraid you’re running into a general shortcoming of the current sliding 
>> windows implementation: every sliding window is treated as its own window 
>> that has window contents and trigger state/timers. For example, if you have 
>> a sliding window of size 4 hours with 1 minute slide this means each element 
>> is in 240 windows and you basically amplify writing to RocksDB by 240. This 
>> gets out of hand very quickly with larger differences between window side 
>> and slide interval.
>>  
>> I’m also afraid there is no solution for this right now so the workaround 
>> Chen mentioned is the way to go right now.
>>  
>> Best,
>> Aljoscha
>> On 24. May 2017, at 14:07, Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>> wrote:
>>  
>> Hi, 
>>  
>> both issues sound like the known problem with RocksDB merging state. Please 
>> take a look here
>>  
>> https://issues.apache.org/jira/browse/FLINK-5756 
>> <https://issues.apache.org/jira/browse/FLINK-5756>
>>  
>> and here
>>  
>> https://github.com/facebook/rocksdb/issues/1988 
>> <https://github.com/facebook/rocksdb/issues/1988>
>>  
>> Best,
>> Stefan
>>  
>>  
>> Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctank...@bol.com 
>> <mailto:ctank...@bol.com>>:
>>  
>> Hi,
>>  
>> We are seeing a similar behaviour for large sliding windows. Let me put some 
>> details here and see if they match up enough with Chen’s:
>>  
>> Technical specs:
>> -          Flink 1.2.1 on YARN
>> -          RocksDB backend, on HDFS. I’ve set the backend to 
>> PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster 
>> runs on spinning disks but that doesn’t seem to help
>>  
>> Pipeline:
>> -          Read from Kafka, extract ids
>> -          KeyBy id,  count occurences of each id using a fold. The window 
>> size of this operator is 10 minutes with a slide of 1 minute
>> -          KeyBy id (again),  compute mean, standard deviation using a fold. 
>> The window size of this operator is 4 hours with a slide of 1 minute.
>> -          Post-process data, sink.
>>  
>> What I observe is:
>> -          With a heap-based backend, the job runs really quick  (couple of 
>> minutes to process 7 days of Kafka data) but eventually goes OOM with a GC 
>> overhead exceeded error.
>> -          With the RocksDB backend, checkpoints get stuck most of the time, 
>> and the “count occurences” step gets a lot of back pressure from the next 
>> operator (on the large window)
>> o    In those cases the checkpoint does succeed, the state for the large 
>> window is around 500-700MB, others states are within the KBs.
>> o    Also in those cases, all time seems to be spent in the ‘alignment’ 
>> phase for a single subtask of the count operator, with the other operators 
>> aligning within milliseconds. The checkpoint duration itself is no more than 
>> 2seconds even for the larger states.
>>  
>>  
>> At this point, I’m a bit at a loss to figure out what’s going on. My best 
>> guess is it has to do with the state access to the RocksDBFoldingState, but 
>> why this so slow is beyond me.
>>  
>> Hope this info helps in figuring out what is going on, and hopefully it is 
>> actually related to Chen’s case :)
>>  
>>  
>> Thanks,
>> Carst
>>  
>> From: Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>>
>> Date: Tuesday, May 23, 2017 at 21:35
>> To: "user@flink.apache.org <mailto:user@flink.apache.org>" 
>> <user@flink.apache.org <mailto:user@flink.apache.org>>
>> Subject: Re: large sliding window perf question
>>  
>> Hi,
>>  
>> Which state backend and Flink version are you using? There was a problem 
>> with large merging states on RocksDB, caused by some inefficiencies in the 
>> merge operator of RocksDB. We provide a custom patch for this with all newer 
>> versions of Flink. 
>>  
>> Best,
>> Stefan
>>  
>> Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com 
>> <mailto:qinnc...@gmail.com>>:
>>  
>> Hi there,
>>  
>> I have seen some weird perf issue while running event time based job with 
>> large sliding window (24 hours offset every 10s) 
>>  
>> pipeline looks simple, 
>> tail kafka topic and assign timestamp and watermark, forward to large 
>> sliding window (30days) and fire every 10 seconds and print out.
>>  
>> what I have seen first hand was checkpointing stuck, took longer than 
>> timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems 
>> back pressure kick in and window operator consumes message really slowly and 
>> throttle sources.
>>  
>> I also tried to limit window time to mins and all issues are gone.
>>  
>> Any suggestion on this. My work around is I implemented processFunction and 
>> keep big value state, periodically evaluate and emit downstream (emulate 
>> what sliding window does)
>>  
>> Thanks,
>> Chen
>>  
>>  
>> 
>> 
>> 
>>  
>>  
> 
> 

Reply via email to