*Thanks Ravi. I got following Error:*
[ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix
overrides nothing
[ERROR]   override def getPartPrefix: String = if(partPrefixFunction ==
null) partPrefix else partPrefixFunction.apply()
[ERROR]                ^
[ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix
overrides nothing
[ERROR]   override def getPartSuffix: String = if(partSuffixFunction ==
null) partSuffix else partSuffixFunction.apply()



On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi Vijayendra,
>
> OutputFileConfig provides a builder method to create immutable objects with 
> given 'prefix' and 'suffix'. The parameter which you are passing to 
> '*withPartPrefix*' will only be evaluated at the time of calling this method 
> '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or 'suffix' 
> then you may try to have your own custom implementation of 'OutputFileConfig' 
> which could provide a way to set function definition for 'prefix' or 
> 'suffix'. For the same, I am attaching you a sample implementation. Kindly 
> make sure that the function definition which you are passing is serializable.
>
>
> Use like this
>
> val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
>   
> .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
>   .withPartSuffixFunction(()=> ".ext")
>
>
> Regards,
> Ravi
>
> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I have tried to assign a dynamic prefix for file name, which contains
>> datetime components.
>> *The Problem is Job always takes initial datetime when job first starts
>> and never refreshes later. *
>> *How can I get dynamic current datetime in filename at sink time ?*
>>
>> *.withPartPrefix
>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))*
>>
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>
>> val config = OutputFileConfig
>>  .builder() .withPartPrefix("prefix")
>>  .withPartSuffix(".ext")
>>  .build()
>>             val sink = StreamingFileSink
>>  .forRowFormat(new Path(outputPath), new 
>> SimpleStringEncoder[String]("UTF-8"))
>>  .withBucketAssigner(new KeyBucketAssigner())
>>  .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
>> .withOutputFileConfig(config)
>>  .build()
>>
>>

Reply via email to