Hi Piotrek, That is correct I was still in 1.10, I am upgrading to 1.11.
Regards, Vijay On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi Yadav, > > What Flink version are you using? `getPartPrefix` and `getPartSufix` > methods were not public before 1.10.1/1.11.0, which might be causing > this problem for you. Other than that, if you are already using Flink > 1.10.1 (or newer), maybe please double check what class are you extending? > The example provided by Ravi seems to be working for me. > > Piotrek > > wt., 13 paź 2020 o 19:02 Vijayendra Yadav <contact....@gmail.com> > napisał(a): > >> *Thanks Ravi. I got following Error:* >> [ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix >> overrides nothing >> [ERROR] override def getPartPrefix: String = if(partPrefixFunction == >> null) partPrefix else partPrefixFunction.apply() >> [ERROR] ^ >> [ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix >> overrides nothing >> [ERROR] override def getPartSuffix: String = if(partSuffixFunction == >> null) partSuffix else partSuffixFunction.apply() >> >> >> >> On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar < >> ravibhushanratna...@gmail.com> wrote: >> >>> Hi Vijayendra, >>> >>> OutputFileConfig provides a builder method to create immutable objects with >>> given 'prefix' and 'suffix'. The parameter which you are passing to >>> '*withPartPrefix*' will only be evaluated at the time of calling this >>> method '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or >>> 'suffix' then you may try to have your own custom implementation of >>> 'OutputFileConfig' which could provide a way to set function definition for >>> 'prefix' or 'suffix'. For the same, I am attaching you a sample >>> implementation. Kindly make sure that the function definition which you are >>> passing is serializable. >>> >>> >>> Use like this >>> >>> val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig() >>> >>> .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS"))) >>> .withPartSuffixFunction(()=> ".ext") >>> >>> >>> Regards, >>> Ravi >>> >>> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com> >>> wrote: >>> >>>> Hi Team, >>>> >>>> I have tried to assign a dynamic prefix for file name, which contains >>>> datetime components. >>>> *The Problem is Job always takes initial datetime when job first starts >>>> and never refreshes later. * >>>> *How can I get dynamic current datetime in filename at sink time ?* >>>> >>>> *.withPartPrefix >>>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))* >>>> >>>> >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>> >>>> val config = OutputFileConfig >>>> .builder() .withPartPrefix("prefix") >>>> .withPartSuffix(".ext") >>>> .build() >>>> val sink = StreamingFileSink >>>> .forRowFormat(new Path(outputPath), new >>>> SimpleStringEncoder[String]("UTF-8")) >>>> .withBucketAssigner(new KeyBucketAssigner()) >>>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>> .withOutputFileConfig(config) >>>> .build() >>>> >>>>