Hi, the state is checkpointed in subdirectories and with unique file names, so having all in one root directory is no problem. This all happens automatically.
As far as I know, there is no implementation that generates output paths for sinks like that. You could open a jira with a feature wish, though. Best, Stefan > Am 03.01.2018 um 16:06 schrieb Kyle Hamlin <hamlin...@gmail.com>: > > Hi Stefan, > > In the past, I ran four separate Flink apps to sink data from four separate > Kafka topics to s3 without any transformations applied. For each Flink app, I > would set the checkpoint directory to > s3://some-bucket/checkpoints/topic-name. It appears that with Flink 1.4 I can > just use a regex to pick up all four topics, and so what you are telling me > is that even if my regex picks up 1000 Kafka topics the only checkpoint path > I need is s3://some-bucket/checkpoints/ and Flink will take care of the rest? > > Additionally, I was wondering how this concept might extend to sinking the > data from this single Flink app that has picked up 1000 Kafka topics to > separate s3 paths? For instance: > > s3://some-bucket/data/topic1/ > s3://some-bucket/data/topic2/ > . > . > . > s3://some-bucket/data/topic1000/ > > Thanks very much for your help Stefan! > > On Wed, Jan 3, 2018 at 10:51 AM Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>> wrote: > Hi, > > first, let my ask why you want to have a different checkpoint directory per > topic? It is perfectly ok to have just a single checkpoint directory, so I > wonder what the intention is? Flink will already create proper subdirectories > and filenames and can identify the right checkpoint data for each operator > instance. > > Best, > Stefan > > >> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <hamlin...@gmail.com >> <mailto:hamlin...@gmail.com>>: >> >> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a >> neat feature. I would like to use this feature, but I'm wondering how that >> impacts the FsStateBackend checkpointing mechanism. Before I would subscribe >> to one topic and set a checkpoint path specific to that topic for example if >> the Kafka topic name was foo: >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/")) >> >> How does one dynamically set these checkpoint paths? Is it even necessary to >> do so, should I have one checkpoint path for all the possible topics the >> regex pattern could pick up? >