Hi Aljoscha, Thanks for your answer. At least by keeping only the latest one we don't have retention problems with the state backend, and for now I guess we could use manually triggered savepoints if we needed to store the history of the state.
Thanks, Juan On Tue, Oct 25, 2016 at 6:58 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > there is already a mechanism for that. Currently, Flink will only keep the > most recent, successful checkpoint. We are currently working on making that > configurable so that, for example, the last n successful checkpoints can be > kept. > > Cheers, > Aljoscha > > On Tue, 25 Oct 2016 at 06:47 Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > >> Hi Gyula, >> >> Thanks a lot for your response, it was very clear. I understand that >> there is no problem of small files due to checkpointing not being >> incremental. I also understand that each worker will interpret a file:// >> URL as local to its own file system, which works ok if all workers have a >> remove file system mounted in the same local path. >> >> Now I have to check if Flink provides some expiration mechanism for old >> checkpoints, although for S3 that is already available, and for HDFS I >> guess some script that periodically deletes older files with hdfs dfs >> -rmr should be easy enough. Is there any documentation about some naming >> convention for checkpoint files that I could rely to delete old >> checkpoints? E.g. some naming schema that uses dates, it would be nicer if >> it was documented because then it would be more stable. >> >> Thanks again for your help. >> >> Greetings, >> >> Juan >> >> >> On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra <gyf...@apache.org> wrote: >> >> Hi Juan, >> >> Let me try to answer some of your questions :) >> >> We have been running Flink Streaming at King for quite some time now with >> multiple jobs having several hundred gigabytes of KV state stored in >> RocksDB. I would say RocksDB state backend is definitely the best choice at >> the moment for large deployments as you can also keep the heap relatively >> small to save some time on GC. But you have to play around with the rocks >> configuration to get the most out of it depending on your hardware. >> >> I am not aware of any caching/TTL functionality exposed in the Flink APIs >> currently. But if you are willing to dig a llittle deeper you could >> implement a lower lever operator that uses timers like the windowing >> mechanisms to clear state after some time. >> >> When you are selecting a checkpoint directory (URL) you need to make sure >> that it is accessible from all the task managers. HDFS is convenient but is >> not strictly necessary. We for instance use CEPH that is mounted as a >> regular disk from the OS's perspective so we can use file:// and still save >> to the distributed storage. As far as I know using yarn doesnt give much >> benefit, I am not sure if Flink exploits any data locality at this moment. >> >> When you are running rocks db state backend there are two concepts you >> need to think about for checkpointing. Your local rocks db directory, and >> the checkpoint directory. The local directory is where the rocks instances >> are created and they live on the taskmanagers local disk/memory. When Flink >> takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS >> or to the selected checkpoint directory. This means there is no data >> fragmentation in the checkpoints. Similar applies to the FsStateBackend but >> that keeps the local state strictly in memory. >> >> I think you should definitely give RocksDB + HDFS a try. It works >> extremely well for very large state sizes given some tuning, but should >> also perform out-of-the-box :) >> >> Cheers, >> Gyula >> >> Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> ezt írta >> (időpont: 2016. okt. 23., V, 22:29): >> >> Hi all, >> >> I don't have much experience with Flink, so please forget me if I ask >> some obvious questions. I was taking a look to the documentation >> on stateful transformations in Flink at https://ci.apache.org/ >> projects/flink/flink-docs-release-1.2/dev/state.html. I'm mostly >> interested in Flink for stream processing, and I would like to know: >> >> - What is the biggest state that has been used in production deployments? >> I'm interested in how many GB of state, among all key-value pairs, have >> been used before in long running streaming jobs deployed in production. >> Maybe some company has shared this in some conference or blog post. I guess >> for that RocksDB backend is the best option for big states, to avoid being >> limited by the available memory. >> >> - Is there any pre built functionality for state eviction? I'm thinking >> of LRU cache-like behavior, with eviction based on time or size, similar to >> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This >> is probably easy to implement anyway, by using the clear() primitive, but I >> wouldn't like to reinvent the wheel if this is already implemented >> somewhere. >> >> - When using file:// for the checkpointing URL, is the data replicated in >> the workers, or a failure in a worker leads to losing the state stored in >> that worker? I guess with hdfs:// we get the replication of HDFS, and we >> don't have that problem. I also guess S3 can be used for checkpointing >> state too, is there any remarkable performance impact of using s3 instead >> of HDFS for checkpointing? I guess some performance is lost compared to a >> setup running in YARN with collocated DataNodes and NodeManagers, but I >> wanted to know if the impact is negible, as checkpointing is performed at a >> relatively slow frequency. Also I'm interested on Flink running on EMR, >> where the impact of this should be even smaller because the access to S3 is >> faster from EMR than from an in-house YARN cluster outside the AWS cloud. >> >> - Is there any problem with the RocksDB backend on top of HDFS related to >> defragmentation? How is clear handled for long running jobs? I'm thinking >> on a streaming job that has a state with a size of several hundred GBs, >> where each key-pair is stored for a week and then deleted. How does clear() >> work, and how do you deal with the "small files problem" of HDFS ( >> http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for >> the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a >> problem for S3, as it is an object store that has no problem with small >> files. >> >> Thanks a lot for your help! >> >> Greetings, >> >> Juan Rodriguez Hortala >> >> >>