Thanks Kostas, I’m narrowing in on a solution:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html> says "You can also specify a custom bucketer by using setBucketer() on a BucketingSink. If desired, the bucketer can use a property of the element or tuple to determine the bucket directory.” BucketingSink<String> sink = new BucketingSink<String>("/base/path"); sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm")); Therefore I’ve created a skeleton class: public class S3Bucketer implements Bucketer { private static final long serialVersionUID = 1L; private final String formatString; public S3Bucketer() { } private void readObject(ObjectInputStream in) { in.defaultReadObject(); } public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) { return true; } public Path getNextBucketPath(Path basePath) { return new Path(basePath + “/some-path-that-I-need-create-from-the-stream"); } } my question now is how do I access the data stream from within the S3Bucketer so that I can generate a filename based on the data with the data stream. Thanks, > On 16 Aug 2017, at 12:55, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > > In the second link for the BucketingSink, you can set your > own Bucketer using the setBucketer method. You do not have to > implement your own sink from scratch. > > Kostas > >> On Aug 16, 2017, at 1:39 PM, ant burton <apburto...@gmail.com >> <mailto:apburto...@gmail.com>> wrote: >> >> or rather >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> >> >> >>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com >>> <mailto:k.klou...@data-artisans.com>> wrote: >>> >>> Hi Ant, >>> >>> I think you can do it by implementing your own Bucketer. >>> >>> Cheers, >>> Kostas >>> >>> . >>>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com >>>> <mailto:apburto...@gmail.com>> wrote: >>>> >>>> Hello, >>>> >>>> Given >>>> >>>> // Set StreamExecutionEnvironment >>>> final StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> // Set checkpoints in ms >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> >>>> // Add source (input stream) >>>> DataStream<String> dataStream = StreamUtil.getDataStream(env, >>>> params); >>>> >>>> How can I construct the s3_filename from the content of the an event, it >>>> seems that whenever I attempt this I either have access to an event or >>>> access to .addSink but not both. >>>> >>>> dataStream.addSink(new BucketingSink<String>("s3a://flink/ >>>> <s3a://flink/>" + s3_filename)); >>>> >>>> >>>> Thanks, >>>> >>>> >>>> >>>> >>> >> >