Great! Please let us know if it solves the issue or not.

Best,
Piotrek

śr., 14 paź 2020 o 17:46 Vijayendra Yadav <contact....@gmail.com>
napisał(a):

> Hi Piotrek,
>
> That is correct I was still in 1.10, I am upgrading to 1.11.
>
> Regards,
> Vijay
>
> On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi Yadav,
>>
>> What Flink version are you using? `getPartPrefix` and `getPartSufix`
>> methods were not public before 1.10.1/1.11.0, which might be causing
>> this problem for you. Other than that, if you are already using Flink
>> 1.10.1 (or newer), maybe please double check what class are you extending?
>> The example provided by Ravi seems to be working for me.
>>
>> Piotrek
>>
>> wt., 13 paź 2020 o 19:02 Vijayendra Yadav <contact....@gmail.com>
>> napisał(a):
>>
>>> *Thanks Ravi. I got following Error:*
>>> [ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix
>>> overrides nothing
>>> [ERROR]   override def getPartPrefix: String = if(partPrefixFunction ==
>>> null) partPrefix else partPrefixFunction.apply()
>>> [ERROR]                ^
>>> [ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix
>>> overrides nothing
>>> [ERROR]   override def getPartSuffix: String = if(partSuffixFunction ==
>>> null) partSuffix else partSuffixFunction.apply()
>>>
>>>
>>>
>>> On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <
>>> ravibhushanratna...@gmail.com> wrote:
>>>
>>>> Hi Vijayendra,
>>>>
>>>> OutputFileConfig provides a builder method to create immutable objects 
>>>> with given 'prefix' and 'suffix'. The parameter which you are passing to 
>>>> '*withPartPrefix*' will only be evaluated at the time of calling this 
>>>> method '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or 
>>>> 'suffix' then you may try to have your own custom implementation of 
>>>> 'OutputFileConfig' which could provide a way to set function definition 
>>>> for 'prefix' or 'suffix'. For the same, I am attaching you a sample 
>>>> implementation. Kindly make sure that the function definition which you 
>>>> are passing is serializable.
>>>>
>>>>
>>>> Use like this
>>>>
>>>> val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
>>>>   
>>>> .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
>>>>   .withPartSuffixFunction(()=> ".ext")
>>>>
>>>>
>>>> Regards,
>>>> Ravi
>>>>
>>>> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I have tried to assign a dynamic prefix for file name, which contains
>>>>> datetime components.
>>>>> *The Problem is Job always takes initial datetime when job first
>>>>> starts and never refreshes later. *
>>>>> *How can I get dynamic current datetime in filename at sink time ?*
>>>>>
>>>>> *.withPartPrefix
>>>>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))*
>>>>>
>>>>>
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>
>>>>> val config = OutputFileConfig
>>>>>  .builder() .withPartPrefix("prefix")
>>>>>  .withPartSuffix(".ext")
>>>>>  .build()
>>>>>             val sink = StreamingFileSink
>>>>>  .forRowFormat(new Path(outputPath), new 
>>>>> SimpleStringEncoder[String]("UTF-8"))
>>>>>  .withBucketAssigner(new KeyBucketAssigner())
>>>>>  .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
>>>>> .withOutputFileConfig(config)
>>>>>  .build()
>>>>>
>>>>>

Reply via email to