Hi Team,

I have tried to assign a dynamic prefix for file name, which contains
datetime components.
*The Problem is Job always takes initial datetime when job first starts and
never refreshes later. *
*How can I get dynamic current datetime in filename at sink time ?*

*.withPartPrefix
(ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))*


https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

val config = OutputFileConfig
 .builder() .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
            val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
 .build()

Reply via email to