Hi,
there is already a mechanism for that. Currently, Flink will only keep the
most recent, successful checkpoint. We are currently working on making that
configurable so that, for example, the last n successful checkpoints can be
kept.

Cheers,
Aljoscha

On Tue, 25 Oct 2016 at 06:47 Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi Gyula,
>
> Thanks a lot for your response, it was very clear. I understand that there
> is no problem of small files due to checkpointing not being incremental. I
> also understand that each worker will interpret a file:// URL as local to
> its own file system, which works ok if all workers have a remove file
> system mounted in the same local path.
>
> Now I have to check if Flink provides some expiration mechanism for old
> checkpoints, although for S3 that is already available, and for HDFS I
> guess some script that periodically deletes older files with hdfs dfs
> -rmr should be easy enough. Is there any documentation about some naming
> convention for checkpoint files that I could rely to delete old
> checkpoints? E.g. some naming schema that uses dates, it would be nicer if
> it was documented because then it would be more stable.
>
> Thanks again for your help.
>
> Greetings,
>
> Juan
>
>
> On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra <gyf...@apache.org> wrote:
>
> Hi Juan,
>
> Let me try to answer some of your questions :)
>
> We have been running Flink Streaming at King for quite some time now with
> multiple jobs having several hundred gigabytes of KV state stored in
> RocksDB. I would say RocksDB state backend is definitely the best choice at
> the moment for large deployments as you can also keep the heap relatively
> small to save some time on GC. But you have to play around with the rocks
> configuration to get the most out of it depending on your hardware.
>
> I am not aware of any caching/TTL functionality exposed in the Flink APIs
> currently. But if you are willing to dig a llittle deeper you could
> implement a lower lever operator that uses timers like the windowing
> mechanisms to clear state after some time.
>
> When you are selecting a checkpoint directory (URL) you need to make sure
> that it is accessible from all the task managers. HDFS is convenient but is
> not strictly necessary. We for instance use CEPH that is mounted as a
> regular disk from the OS's perspective so we can use file:// and still save
> to the distributed storage. As far as I know using yarn doesnt give much
> benefit, I am not sure if Flink exploits any data locality at this moment.
>
> When you are running rocks db state backend there are two concepts you
> need to think about for checkpointing. Your local rocks db directory, and
> the checkpoint directory. The local directory is where the rocks instances
> are created and they live on the taskmanagers local disk/memory. When Flink
> takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
> or to the selected checkpoint directory. This means there is no data
> fragmentation in the checkpoints. Similar applies to the FsStateBackend but
> that keeps the local state strictly in memory.
>
> I think you should definitely give RocksDB + HDFS a try. It works
> extremely well for very large state sizes given some tuning, but should
> also perform out-of-the-box :)
>
> Cheers,
> Gyula
>
> Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> ezt írta
> (időpont: 2016. okt. 23., V, 22:29):
>
> Hi all,
>
> I don't have much experience with Flink, so please forget me if I ask some
> obvious questions. I was taking a look to the documentation on stateful
> transformations in Flink at
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html.
> I'm mostly interested in Flink for stream processing, and I would like to
> know:
>
> - What is the biggest state that has been used in production deployments?
> I'm interested in how many GB of state, among all key-value pairs, have
> been used before in long running streaming jobs deployed in production.
> Maybe some company has shared this in some conference or blog post. I guess
> for that RocksDB backend is the best option for big states, to avoid being
> limited by the available memory.
>
> - Is there any pre built functionality for state eviction? I'm thinking of
> LRU cache-like behavior, with eviction based on time or size, similar to
> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
> is probably easy to implement anyway, by using the clear() primitive, but I
> wouldn't like to reinvent the wheel if this is already implemented
> somewhere.
>
> - When using file:// for the checkpointing URL, is the data replicated in
> the workers, or a failure in a worker leads to losing the state stored in
> that worker? I guess with hdfs:// we get the replication of HDFS, and we
> don't have that problem. I also guess S3 can be used for checkpointing
> state too, is there any remarkable performance impact of using s3 instead
> of HDFS for checkpointing? I guess some performance is lost compared to a
> setup running in YARN with collocated DataNodes and NodeManagers, but I
> wanted to know if the impact is negible, as checkpointing is performed at a
> relatively slow frequency. Also I'm interested on Flink running on EMR,
> where the impact of this should be even smaller because the access to S3 is
> faster from EMR than from an in-house YARN cluster outside the AWS cloud.
>
> - Is there any problem with the RocksDB backend on top of HDFS related to
> defragmentation? How is clear handled for long running jobs? I'm thinking
> on a streaming job that has a state with a size of several hundred GBs,
> where each key-pair is stored for a week and then deleted. How does clear()
> work, and how do you deal with the "small files problem" of HDFS (
> http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for
> the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a
> problem for S3, as it is an object store that has no problem with small
> files.
>
> Thanks a lot for your help!
>
> Greetings,
>
> Juan Rodriguez Hortala
>
>
>

Reply via email to