Thanks Kostas,

I’m narrowing in on a solution:
 says "You can also specify a custom bucketer by using setBucketer() on a 
BucketingSink. If desired, the bucketer can use a property of the element or 
tuple to determine the bucket directory.”

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
Therefore I’ve created a skeleton class:

public class S3Bucketer implements Bucketer {
        private static final long serialVersionUID = 1L;

        private final String formatString;

        public S3Bucketer() {

        private void readObject(ObjectInputStream in) {

        public boolean shouldStartNewBucket(Path basePath, Path 
currentBucketPath) {
                return true;

        public Path getNextBucketPath(Path basePath) {
                return new Path(basePath + 

my question now is how do I access the data stream from within the S3Bucketer 
so that I can generate a filename based on the data with the data stream.


> On 16 Aug 2017, at 12:55, Kostas Kloudas <> wrote:
> In the second link for the BucketingSink, you can set your 
> own Bucketer using the setBucketer method. You do not have to 
> implement your own sink from scratch.
> Kostas
>> On Aug 16, 2017, at 1:39 PM, ant burton < 
>> <>> wrote:
>> or rather 
>> <>
>>> On 16 Aug 2017, at 12:24, Kostas Kloudas < 
>>> <>> wrote:
>>> Hi Ant,
>>> I think you can do it by implementing your own Bucketer.
>>> Cheers,
>>> Kostas
>>> .
>>>> On Aug 16, 2017, at 1:09 PM, ant burton < 
>>>> <>> wrote:
>>>> Hello,
>>>> Given 
>>>>       // Set StreamExecutionEnvironment
>>>>       final StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>       // Set checkpoints in ms
>>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>       // Add source (input stream)
>>>>       DataStream<String> dataStream = StreamUtil.getDataStream(env, 
>>>> params);
>>>> How can I construct the s3_filename from the content of the an event, it 
>>>> seems that whenever I attempt this I either have access to an event or 
>>>> access to .addSink but not both.
>>>>    dataStream.addSink(new BucketingSink<String>("s3a://flink/ 
>>>> <s3a://flink/>" + s3_filename));
>>>> Thanks,

Reply via email to