Thank you for your help it’s greatly appreciated. My aim is to be able “ use a property of the element to determine the bucket directory”
With your suggestions, this is what I have so far, its obviously wrong, I hope I’m getting closer. Is it correct to still implement Bucketer, just change where it is imported from? or do I need to import BucketingSink ? import org.apache.hadoop.fs.Path; import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer; // I think this is wrong import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.fs.Clock; public class S3Bucketer implements Bucketer { public Path getBucketPath(Clock clock, Path basePath, String element) { // Now that we have access to element, we can // generate a s3 filename path from it String s3_filename_path = ""; return new Path(s3_filename_path); } } Apologies my Java is limited at the present. Thanks, > On 16 Aug 2017, at 16:06, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > > Hi Ant, > > I think you are implementing the wrong Bucketer. > This seems to be the one for the RollingSink which is deprecated. > Is this correct? > > You should implement the BucketingSink one, which is in the package: > > org.apache.flink.streaming.connectors.fs.bucketing > > That one requires the implementation of 1 method with signature: > > Path getBucketPath(Clock clock, Path basePath, T element); > > which from what I understand from you requirements gives you access > to the element that you need. > > Cheers, > Kostas > >> On Aug 16, 2017, at 3:31 PM, ant burton <apburto...@gmail.com> wrote: >> >> >> Thanks Kostas, >> >> I’m narrowing in on a solution: >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html >> says "You can also specify a custom bucketer by using setBucketer() on a >> BucketingSink. If desired, the bucketer can use a property of the element or >> tuple to determine the bucket directory.” >> >> BucketingSink<String> sink = new BucketingSink<String>("/base/path"); >> sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm")); >> Therefore I’ve created a skeleton class: >> >> public class S3Bucketer implements Bucketer { >> private static final long serialVersionUID = 1L; >> >> private final String formatString; >> >> public S3Bucketer() { >> } >> >> private void readObject(ObjectInputStream in) { >> in.defaultReadObject(); >> } >> >> public boolean shouldStartNewBucket(Path basePath, Path >> currentBucketPath) { >> return true; >> } >> >> public Path getNextBucketPath(Path basePath) { >> return new Path(basePath + >> “/some-path-that-I-need-create-from-the-stream"); >> } >> } >> >> my question now is how do I access the data stream from within the >> S3Bucketer so that I can generate a filename based on the data with the data >> stream. >> >> Thanks, >> >>> On 16 Aug 2017, at 12:55, Kostas Kloudas <k.klou...@data-artisans.com> >>> wrote: >>> >>> In the second link for the BucketingSink, you can set your >>> own Bucketer using the setBucketer method. You do not have to >>> implement your own sink from scratch. >>> >>> Kostas >>> >>>> On Aug 16, 2017, at 1:39 PM, ant burton <apburto...@gmail.com> wrote: >>>> >>>> or rather >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html >>>> >>>> >>>>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com> >>>>> wrote: >>>>> >>>>> Hi Ant, >>>>> >>>>> I think you can do it by implementing your own Bucketer. >>>>> >>>>> Cheers, >>>>> Kostas >>>>> >>>>> . >>>>>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com> wrote: >>>>>> >>>>>> Hello, >>>>>> >>>>>> Given >>>>>> >>>>>> // Set StreamExecutionEnvironment >>>>>> final StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> >>>>>> // Set checkpoints in ms >>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>>> >>>>>> // Add source (input stream) >>>>>> DataStream<String> dataStream = StreamUtil.getDataStream(env, >>>>>> params); >>>>>> >>>>>> How can I construct the s3_filename from the content of the an event, it >>>>>> seems that whenever I attempt this I either have access to an event or >>>>>> access to .addSink but not both. >>>>>> >>>>>> dataStream.addSink(new BucketingSink<String>("s3a://flink/" + >>>>>> s3_filename)); >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >