Hi, there is already a mechanism for that. Currently, Flink will only keep the most recent, successful checkpoint. We are currently working on making that configurable so that, for example, the last n successful checkpoints can be kept.
Cheers, Aljoscha On Tue, 25 Oct 2016 at 06:47 Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote: > Hi Gyula, > > Thanks a lot for your response, it was very clear. I understand that there > is no problem of small files due to checkpointing not being incremental. I > also understand that each worker will interpret a file:// URL as local to > its own file system, which works ok if all workers have a remove file > system mounted in the same local path. > > Now I have to check if Flink provides some expiration mechanism for old > checkpoints, although for S3 that is already available, and for HDFS I > guess some script that periodically deletes older files with hdfs dfs > -rmr should be easy enough. Is there any documentation about some naming > convention for checkpoint files that I could rely to delete old > checkpoints? E.g. some naming schema that uses dates, it would be nicer if > it was documented because then it would be more stable. > > Thanks again for your help. > > Greetings, > > Juan > > > On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra <gyf...@apache.org> wrote: > > Hi Juan, > > Let me try to answer some of your questions :) > > We have been running Flink Streaming at King for quite some time now with > multiple jobs having several hundred gigabytes of KV state stored in > RocksDB. I would say RocksDB state backend is definitely the best choice at > the moment for large deployments as you can also keep the heap relatively > small to save some time on GC. But you have to play around with the rocks > configuration to get the most out of it depending on your hardware. > > I am not aware of any caching/TTL functionality exposed in the Flink APIs > currently. But if you are willing to dig a llittle deeper you could > implement a lower lever operator that uses timers like the windowing > mechanisms to clear state after some time. > > When you are selecting a checkpoint directory (URL) you need to make sure > that it is accessible from all the task managers. HDFS is convenient but is > not strictly necessary. We for instance use CEPH that is mounted as a > regular disk from the OS's perspective so we can use file:// and still save > to the distributed storage. As far as I know using yarn doesnt give much > benefit, I am not sure if Flink exploits any data locality at this moment. > > When you are running rocks db state backend there are two concepts you > need to think about for checkpointing. Your local rocks db directory, and > the checkpoint directory. The local directory is where the rocks instances > are created and they live on the taskmanagers local disk/memory. When Flink > takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS > or to the selected checkpoint directory. This means there is no data > fragmentation in the checkpoints. Similar applies to the FsStateBackend but > that keeps the local state strictly in memory. > > I think you should definitely give RocksDB + HDFS a try. It works > extremely well for very large state sizes given some tuning, but should > also perform out-of-the-box :) > > Cheers, > Gyula > > Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> ezt írta > (időpont: 2016. okt. 23., V, 22:29): > > Hi all, > > I don't have much experience with Flink, so please forget me if I ask some > obvious questions. I was taking a look to the documentation on stateful > transformations in Flink at > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html. > I'm mostly interested in Flink for stream processing, and I would like to > know: > > - What is the biggest state that has been used in production deployments? > I'm interested in how many GB of state, among all key-value pairs, have > been used before in long running streaming jobs deployed in production. > Maybe some company has shared this in some conference or blog post. I guess > for that RocksDB backend is the best option for big states, to avoid being > limited by the available memory. > > - Is there any pre built functionality for state eviction? I'm thinking of > LRU cache-like behavior, with eviction based on time or size, similar to > Guava cache (https://github.com/google/guava/wiki/CachesExplained). This > is probably easy to implement anyway, by using the clear() primitive, but I > wouldn't like to reinvent the wheel if this is already implemented > somewhere. > > - When using file:// for the checkpointing URL, is the data replicated in > the workers, or a failure in a worker leads to losing the state stored in > that worker? I guess with hdfs:// we get the replication of HDFS, and we > don't have that problem. I also guess S3 can be used for checkpointing > state too, is there any remarkable performance impact of using s3 instead > of HDFS for checkpointing? I guess some performance is lost compared to a > setup running in YARN with collocated DataNodes and NodeManagers, but I > wanted to know if the impact is negible, as checkpointing is performed at a > relatively slow frequency. Also I'm interested on Flink running on EMR, > where the impact of this should be even smaller because the access to S3 is > faster from EMR than from an in-house YARN cluster outside the AWS cloud. > > - Is there any problem with the RocksDB backend on top of HDFS related to > defragmentation? How is clear handled for long running jobs? I'm thinking > on a streaming job that has a state with a size of several hundred GBs, > where each key-pair is stored for a week and then deleted. How does clear() > work, and how do you deal with the "small files problem" of HDFS ( > http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for > the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a > problem for S3, as it is an object store that has no problem with small > files. > > Thanks a lot for your help! > > Greetings, > > Juan Rodriguez Hortala > > >