Hi Raja, I think you can in fact do this by implementing a custom Bucketer. You can have a look at BasePathBucketer and extend that to include the timestamp in the path that is returned. You should probably clamp the timestamp so that you don't get a new path for every millisecond.
Best, Aljoscha > On 1. Sep 2017, at 08:18, Piotr Nowojski <pi...@data-artisans.com> wrote: > > Hi, > > BucketingSink doesn’t support the feature that you are requesting, you can > not specify a dynamically generated prefix/suffix. > > Piotrek > >> On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli <raja.aravapa...@target.com >> <mailto:raja.aravapa...@target.com>> wrote: >> >> >> Hi, >> >> I have a flink application that is streaming data into HDFS and I am using >> Bucketing Sink for that. And, I want to know if is it possible to rename the >> part files that is being created in the base hdfs directory. >> >> Right now I am using the below code for including the timestamp into >> part-file name, but the problem I am facing is the timestamp is not changing >> for the new part file that is being rolled! >> >> >> BucketingSink<String> HdfsSink = new BucketingSink<String> (hdfsOutputPath); >> >> HdfsSink.setBucketer(new BasePathBucketer<String>()); >> HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means >> 'hdfsOutputBatchSizeInMB' MB >> HdfsSink.setPartPrefix("PART-FILE-" + >> Long.toString(System.currentTimeMillis())); >> >> >> Can someone please suggest me, what code changes I can try so that I get a >> new timestamp for every part file that is being rolled new? >> >> >> Thanks a lot. >> >> Regards, >> Raja. >