Hi,

the state is checkpointed in subdirectories and with unique file names, so 
having all in one root directory is no problem. This all happens automatically.

As far as I know, there is no implementation that generates output paths for 
sinks like that. You could open a jira with a feature wish, though.

Best,
Stefan

> Am 03.01.2018 um 16:06 schrieb Kyle Hamlin <hamlin...@gmail.com>:
> 
> Hi Stefan,
> 
> In the past, I ran four separate Flink apps to sink data from four separate 
> Kafka topics to s3 without any transformations applied. For each Flink app, I 
> would set the checkpoint directory to 
> s3://some-bucket/checkpoints/topic-name. It appears that with Flink 1.4 I can 
> just use a regex to pick up all four topics, and so what you are telling me 
> is that even if my regex picks up 1000 Kafka topics the only checkpoint path 
> I need is s3://some-bucket/checkpoints/ and Flink will take care of the rest?
> 
> Additionally, I was wondering how this concept might extend to sinking the 
> data from this single Flink app that has picked up 1000 Kafka topics to 
> separate s3 paths? For instance:
> 
> s3://some-bucket/data/topic1/
> s3://some-bucket/data/topic2/
> .
> .
> .
> s3://some-bucket/data/topic1000/
> 
> Thanks very much for your help Stefan!
> 
> On Wed, Jan 3, 2018 at 10:51 AM Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> first, let my ask why you want to have a different checkpoint directory per 
> topic? It is perfectly ok to have just a single checkpoint directory, so I 
> wonder what the intention is? Flink will already create proper subdirectories 
> and filenames and can identify the right checkpoint data for each operator 
> instance.
> 
> Best,
> Stefan
> 
> 
>> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <hamlin...@gmail.com 
>> <mailto:hamlin...@gmail.com>>:
>> 
>> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a 
>> neat feature. I would like to use this feature, but I'm wondering how that 
>> impacts the FsStateBackend checkpointing mechanism. Before I would subscribe 
>> to one topic and set a checkpoint path specific to that topic for example if 
>> the Kafka topic name was foo:
>> 
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))
>> 
>> How does one dynamically set these checkpoint paths? Is it even necessary to 
>> do so, should I have one checkpoint path for all the possible topics the 
>> regex pattern could pick up?
> 

Reply via email to