Yes Cast, I noticed your version is already 1.2.1, which is why I contacted
Aljoscha to take a look here because he knows best about the expected
scalability of the sliding window implementation.
> Am 24.05.2017 um 16:49 schrieb Carst Tankink <ctank...@bol.com>:
>
> Hi,
>
> Thanks Aljoshcha!
> To complete my understanding: the problem here is that each element in the
> sliding window(s) basically triggers 240 get+put calls instead of just 1,
> right? I can see how that blows up :-)
> I have a good idea on how to proceed next, so I will be trying out writing
> the custom ProcessFunction next (week).
>
> Stefan, in our case we are already on Flink 1.2.1 which should have the
> patched version of RocksDB, right? Because that patch did solve an issue we
> had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was
> stalling quite often under Flink 1.2.0) but did not solve this case, which
> fits the “way too much RocksDB access” explanation better.
>
>
> Thanks again,
> Carst
>
> From: Aljoscha Krettek <aljos...@apache.org>
> Date: Wednesday, May 24, 2017 at 16:13
> To: Stefan Richter <s.rich...@data-artisans.com>
> Cc: Carst Tankink <ctank...@bol.com>, "user@flink.apache.org"
> <user@flink.apache.org>
> Subject: Re: large sliding window perf question
>
> Hi,
>
> I’m afraid you’re running into a general shortcoming of the current sliding
> windows implementation: every sliding window is treated as its own window
> that has window contents and trigger state/timers. For example, if you have a
> sliding window of size 4 hours with 1 minute slide this means each element is
> in 240 windows and you basically amplify writing to RocksDB by 240. This gets
> out of hand very quickly with larger differences between window side and
> slide interval.
>
> I’m also afraid there is no solution for this right now so the workaround
> Chen mentioned is the way to go right now.
>
> Best,
> Aljoscha
> On 24. May 2017, at 14:07, Stefan Richter <s.rich...@data-artisans.com
> <mailto:s.rich...@data-artisans.com>> wrote:
>
> Hi,
>
> both issues sound like the known problem with RocksDB merging state. Please
> take a look here
>
> https://issues.apache.org/jira/browse/FLINK-5756
> <https://issues.apache.org/jira/browse/FLINK-5756>
>
> and here
>
> https://github.com/facebook/rocksdb/issues/1988
> <https://github.com/facebook/rocksdb/issues/1988>
>
> Best,
> Stefan
>
>
> Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctank...@bol.com
> <mailto:ctank...@bol.com>>:
>
> Hi,
>
> We are seeing a similar behaviour for large sliding windows. Let me put some
> details here and see if they match up enough with Chen’s:
>
> Technical specs:
> - Flink 1.2.1 on YARN
> - RocksDB backend, on HDFS. I’ve set the backend to
> PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster
> runs on spinning disks but that doesn’t seem to help
>
> Pipeline:
> - Read from Kafka, extract ids
> - KeyBy id, count occurences of each id using a fold. The window
> size of this operator is 10 minutes with a slide of 1 minute
> - KeyBy id (again), compute mean, standard deviation using a fold.
> The window size of this operator is 4 hours with a slide of 1 minute.
> - Post-process data, sink.
>
> What I observe is:
> - With a heap-based backend, the job runs really quick (couple of
> minutes to process 7 days of Kafka data) but eventually goes OOM with a GC
> overhead exceeded error.
> - With the RocksDB backend, checkpoints get stuck most of the time,
> and the “count occurences” step gets a lot of back pressure from the next
> operator (on the large window)
> o In those cases the checkpoint does succeed, the state for the large
> window is around 500-700MB, others states are within the KBs.
> o Also in those cases, all time seems to be spent in the ‘alignment’ phase
> for a single subtask of the count operator, with the other operators aligning
> within milliseconds. The checkpoint duration itself is no more than 2seconds
> even for the larger states.
>
>
> At this point, I’m a bit at a loss to figure out what’s going on. My best
> guess is it has to do with the state access to the RocksDBFoldingState, but
> why this so slow is beyond me.
>
> Hope this info helps in figuring out what is going on, and hopefully it is
> actually related to Chen’s case :)
>
>
> Thanks,
> Carst
>
> From: Stefan Richter <s.rich...@data-artisans.com
> <mailto:s.rich...@data-artisans.com>>
> Date: Tuesday, May 23, 2017 at 21:35
> To: "user@flink.apache.org <mailto:user@flink.apache.org>"
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> Subject: Re: large sliding window perf question
>
> Hi,
>
> Which state backend and Flink version are you using? There was a problem with
> large merging states on RocksDB, caused by some inefficiencies in the merge
> operator of RocksDB. We provide a custom patch for this with all newer
> versions of Flink.
>
> Best,
> Stefan
>
> Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com
> <mailto:qinnc...@gmail.com>>:
>
> Hi there,
>
> I have seen some weird perf issue while running event time based job with
> large sliding window (24 hours offset every 10s)
>
> pipeline looks simple,
> tail kafka topic and assign timestamp and watermark, forward to large sliding
> window (30days) and fire every 10 seconds and print out.
>
> what I have seen first hand was checkpointing stuck, took longer than timeout
> despite traffic volume is low ~300 TPS. Looking deeper, it seems back
> pressure kick in and window operator consumes message really slowly and
> throttle sources.
>
> I also tried to limit window time to mins and all issues are gone.
>
> Any suggestion on this. My work around is I implemented processFunction and
> keep big value state, periodically evaluate and emit downstream (emulate what
> sliding window does)
>
> Thanks,
> Chen
>
>
>
>
>
>
>