*Thanks Ravi. I got following Error:* [ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix overrides nothing [ERROR] override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply() [ERROR] ^ [ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix overrides nothing [ERROR] override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply()
On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi Vijayendra, > > OutputFileConfig provides a builder method to create immutable objects with > given 'prefix' and 'suffix'. The parameter which you are passing to > '*withPartPrefix*' will only be evaluated at the time of calling this method > '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or 'suffix' > then you may try to have your own custom implementation of 'OutputFileConfig' > which could provide a way to set function definition for 'prefix' or > 'suffix'. For the same, I am attaching you a sample implementation. Kindly > make sure that the function definition which you are passing is serializable. > > > Use like this > > val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig() > > .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS"))) > .withPartSuffixFunction(()=> ".ext") > > > Regards, > Ravi > > On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com> > wrote: > >> Hi Team, >> >> I have tried to assign a dynamic prefix for file name, which contains >> datetime components. >> *The Problem is Job always takes initial datetime when job first starts >> and never refreshes later. * >> *How can I get dynamic current datetime in filename at sink time ?* >> >> *.withPartPrefix >> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))* >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >> >> val config = OutputFileConfig >> .builder() .withPartPrefix("prefix") >> .withPartSuffix(".ext") >> .build() >> val sink = StreamingFileSink >> .forRowFormat(new Path(outputPath), new >> SimpleStringEncoder[String]("UTF-8")) >> .withBucketAssigner(new KeyBucketAssigner()) >> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >> .withOutputFileConfig(config) >> .build() >> >>