Hi, first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for each operator instance.
Best, Stefan > Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <hamlin...@gmail.com>: > > Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a > neat feature. I would like to use this feature, but I'm wondering how that > impacts the FsStateBackend checkpointing mechanism. Before I would subscribe > to one topic and set a checkpoint path specific to that topic for example if > the Kafka topic name was foo: > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/")) > > How does one dynamically set these checkpoint paths? Is it even necessary to > do so, should I have one checkpoint path for all the possible topics the > regex pattern could pick up?