Sorry for the late reply. There's not much you can do at the moment, as Flink needs to sync on the checkpoint barriers. There's something in the making for addressing the issue soon: https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
Did you try out using the FsStateBackend? If you are going to stick with rocks, I would recommend to understand what exactly causes the poor performance. I see the following areas: - serialization costs - disk / ssd speed - network speed (during checkpoint creation) (as Yu mentioned) - if you have asynchronous checkpoints enabled, they will also slow down the processing. On Sun, Feb 23, 2020 at 8:27 PM Chen Qin <c...@pinterest.com> wrote: > Just follow up on this thread, it accurately caused by key skew. Given > single subtask is single threaded 5% of slow processing cause entire job > back pressures on rocksdbstatebackend. > > Robert, > > What is blocking us enable multi threading in processor? I recall it has > something todo with barrier and record in order. Can you share more > insights on this? > > Chen > > On Feb 21, 2020, at 4:56 AM, Robert Metzger <rmetz...@apache.org> wrote: > >  > I would try the FsStateBackend in this scenario, as you have enough memory > available. > > On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <ranzh...@pinterest.com> wrote: > >> Hi Gordon, >> >> Thanks for your reply! Regarding state size - we are at 200-300gb but we >> have 120 parallelism which will make each task handle ~2 - 3 gb state. >> (when we submit the job we are setting tm memory to 15g.) In this scenario >> what will be the best fit for statebackend? >> >> Thanks, >> Ran >> >> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> >> wrote: >> >>> Hi Ran, >>> >>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <ranzh...@pinterest.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> We have a Flink app that uses a KeyedProcessFunction, and in the >>>> function it requires a ValueState(of TreeSet) and the processElement method >>>> needs to access and update it. We tried to use RocksDB as our stateBackend >>>> but the performance is not good, and intuitively we think it was because of >>>> the serialization / deserialization on each processElement call. >>>> >>> >>> As you have already pointed out, serialization behaviour is a major >>> difference between the 2 state backends, and will directly impact >>> performance due to the extra runtime overhead in RocksDB. >>> If you plan to continue using the RocksDB state backend, make sure to >>> use MapState instead of ValueState where possible, since every access to >>> the ValueState in the RocksDB backend requires serializing / deserializing >>> the whole value. >>> For MapState, de-/serialization happens per K-V access. Whether or not >>> this makes sense would of course depend on your state access pattern. >>> >>> >>>> Then we tried to switch to use FsStateBackend (which keeps the >>>> in-flight data in the TaskManager’s memory according to doc), and it could >>>> resolve the performance issue. *So we want to understand better what >>>> are the tradeoffs in choosing between these 2 stateBackend.* Our >>>> checkpoint size is 200 - 300 GB in stable state. For now we know one >>>> benefits of RocksDB is it supports incremental checkpoint, but would love >>>> to know what else we are losing in choosing FsStateBackend. >>>> >>> >>> As of now, feature-wise both backends support asynchronous snapshotting, >>> state schema evolution, and access via the State Processor API. >>> In the end, the major factor for deciding between the two state backends >>> would be your expected state size. >>> That being said, it could be possible in the future that savepoint >>> formats for the backends are changed to be compatible, meaning that you >>> will be able to switch between different backends upon restore [1]. >>> >>> >>>> >>>> Thanks a lot! >>>> Ran Zhang >>>> >>> >>> Cheers, >>> Gordon >>> >>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State >>> >>