Hi Piotrek,

That is correct I was still in 1.10, I am upgrading to 1.11.

Regards,
Vijay

On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi Yadav,
>
> What Flink version are you using? `getPartPrefix` and `getPartSufix`
> methods were not public before 1.10.1/1.11.0, which might be causing
> this problem for you. Other than that, if you are already using Flink
> 1.10.1 (or newer), maybe please double check what class are you extending?
> The example provided by Ravi seems to be working for me.
>
> Piotrek
>
> wt., 13 paź 2020 o 19:02 Vijayendra Yadav <contact....@gmail.com>
> napisał(a):
>
>> *Thanks Ravi. I got following Error:*
>> [ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix
>> overrides nothing
>> [ERROR]   override def getPartPrefix: String = if(partPrefixFunction ==
>> null) partPrefix else partPrefixFunction.apply()
>> [ERROR]                ^
>> [ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix
>> overrides nothing
>> [ERROR]   override def getPartSuffix: String = if(partSuffixFunction ==
>> null) partSuffix else partSuffixFunction.apply()
>>
>>
>>
>> On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Hi Vijayendra,
>>>
>>> OutputFileConfig provides a builder method to create immutable objects with 
>>> given 'prefix' and 'suffix'. The parameter which you are passing to 
>>> '*withPartPrefix*' will only be evaluated at the time of calling this 
>>> method '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or 
>>> 'suffix' then you may try to have your own custom implementation of 
>>> 'OutputFileConfig' which could provide a way to set function definition for 
>>> 'prefix' or 'suffix'. For the same, I am attaching you a sample 
>>> implementation. Kindly make sure that the function definition which you are 
>>> passing is serializable.
>>>
>>>
>>> Use like this
>>>
>>> val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
>>>   
>>> .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
>>>   .withPartSuffixFunction(()=> ".ext")
>>>
>>>
>>> Regards,
>>> Ravi
>>>
>>> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com>
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I have tried to assign a dynamic prefix for file name, which contains
>>>> datetime components.
>>>> *The Problem is Job always takes initial datetime when job first starts
>>>> and never refreshes later. *
>>>> *How can I get dynamic current datetime in filename at sink time ?*
>>>>
>>>> *.withPartPrefix
>>>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))*
>>>>
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>
>>>> val config = OutputFileConfig
>>>>  .builder() .withPartPrefix("prefix")
>>>>  .withPartSuffix(".ext")
>>>>  .build()
>>>>             val sink = StreamingFileSink
>>>>  .forRowFormat(new Path(outputPath), new 
>>>> SimpleStringEncoder[String]("UTF-8"))
>>>>  .withBucketAssigner(new KeyBucketAssigner())
>>>>  .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
>>>> .withOutputFileConfig(config)
>>>>  .build()
>>>>
>>>>

Reply via email to