Great! Please let us know if it solves the issue or not. Best, Piotrek
śr., 14 paź 2020 o 17:46 Vijayendra Yadav <contact....@gmail.com> napisał(a): > Hi Piotrek, > > That is correct I was still in 1.10, I am upgrading to 1.11. > > Regards, > Vijay > > On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi Yadav, >> >> What Flink version are you using? `getPartPrefix` and `getPartSufix` >> methods were not public before 1.10.1/1.11.0, which might be causing >> this problem for you. Other than that, if you are already using Flink >> 1.10.1 (or newer), maybe please double check what class are you extending? >> The example provided by Ravi seems to be working for me. >> >> Piotrek >> >> wt., 13 paź 2020 o 19:02 Vijayendra Yadav <contact....@gmail.com> >> napisał(a): >> >>> *Thanks Ravi. I got following Error:* >>> [ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix >>> overrides nothing >>> [ERROR] override def getPartPrefix: String = if(partPrefixFunction == >>> null) partPrefix else partPrefixFunction.apply() >>> [ERROR] ^ >>> [ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix >>> overrides nothing >>> [ERROR] override def getPartSuffix: String = if(partSuffixFunction == >>> null) partSuffix else partSuffixFunction.apply() >>> >>> >>> >>> On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar < >>> ravibhushanratna...@gmail.com> wrote: >>> >>>> Hi Vijayendra, >>>> >>>> OutputFileConfig provides a builder method to create immutable objects >>>> with given 'prefix' and 'suffix'. The parameter which you are passing to >>>> '*withPartPrefix*' will only be evaluated at the time of calling this >>>> method '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or >>>> 'suffix' then you may try to have your own custom implementation of >>>> 'OutputFileConfig' which could provide a way to set function definition >>>> for 'prefix' or 'suffix'. For the same, I am attaching you a sample >>>> implementation. Kindly make sure that the function definition which you >>>> are passing is serializable. >>>> >>>> >>>> Use like this >>>> >>>> val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig() >>>> >>>> .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS"))) >>>> .withPartSuffixFunction(()=> ".ext") >>>> >>>> >>>> Regards, >>>> Ravi >>>> >>>> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com> >>>> wrote: >>>> >>>>> Hi Team, >>>>> >>>>> I have tried to assign a dynamic prefix for file name, which contains >>>>> datetime components. >>>>> *The Problem is Job always takes initial datetime when job first >>>>> starts and never refreshes later. * >>>>> *How can I get dynamic current datetime in filename at sink time ?* >>>>> >>>>> *.withPartPrefix >>>>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))* >>>>> >>>>> >>>>> >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>> >>>>> val config = OutputFileConfig >>>>> .builder() .withPartPrefix("prefix") >>>>> .withPartSuffix(".ext") >>>>> .build() >>>>> val sink = StreamingFileSink >>>>> .forRowFormat(new Path(outputPath), new >>>>> SimpleStringEncoder[String]("UTF-8")) >>>>> .withBucketAssigner(new KeyBucketAssigner()) >>>>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>>> .withOutputFileConfig(config) >>>>> .build() >>>>> >>>>>