Thank you for your help it’s greatly appreciated.

My aim is to be able “ use a property of the element to determine the bucket 
directory”

With your suggestions, this is what I have so far, its obviously wrong, I hope 
I’m getting closer.

Is it correct to still implement Bucketer, just change where it is imported 
from? or do I need to import BucketingSink ?

import org.apache.hadoop.fs.Path;
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer; // I think 
this is wrong
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.connectors.fs.Clock;

public class S3Bucketer implements Bucketer {
    public Path getBucketPath(Clock clock, Path basePath, String element) {
        // Now that we have access to element, we can
        // generate a s3 filename path from it
        String s3_filename_path = "";

        return new Path(s3_filename_path);
    }
}

Apologies my Java is limited at the present.

Thanks,

> On 16 Aug 2017, at 16:06, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> 
> Hi Ant,
> 
> I think you are implementing the wrong Bucketer. 
> This seems to be the one for the RollingSink which is deprecated. 
> Is this correct?
> 
> You should implement the BucketingSink one, which is in the package:
> 
> org.apache.flink.streaming.connectors.fs.bucketing
> 
> That one requires the implementation of 1 method with signature:
> 
> Path getBucketPath(Clock clock, Path basePath, T element);
> 
> which from what I understand from you requirements gives you access 
> to the element that you need.
> 
> Cheers,
> Kostas
> 
>> On Aug 16, 2017, at 3:31 PM, ant burton <apburto...@gmail.com> wrote:
>> 
>> 
>> Thanks Kostas,
>> 
>> I’m narrowing in on a solution:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
>>  says "You can also specify a custom bucketer by using setBucketer() on a 
>> BucketingSink. If desired, the bucketer can use a property of the element or 
>> tuple to determine the bucket directory.”
>> 
>> BucketingSink<String> sink = new BucketingSink<String>("/base/path");
>> sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
>> Therefore I’ve created a skeleton class:
>> 
>> public class S3Bucketer implements Bucketer {
>>      private static final long serialVersionUID = 1L;
>> 
>>      private final String formatString;
>> 
>>      public S3Bucketer() {
>>      }
>> 
>>      private void readObject(ObjectInputStream in) {
>>              in.defaultReadObject();
>>      }
>> 
>>      public boolean shouldStartNewBucket(Path basePath, Path 
>> currentBucketPath) {
>>              return true;
>>      }
>> 
>>      public Path getNextBucketPath(Path basePath) {
>>              return new Path(basePath + 
>> “/some-path-that-I-need-create-from-the-stream");
>>      }
>> }
>> 
>> my question now is how do I access the data stream from within the 
>> S3Bucketer so that I can generate a filename based on the data with the data 
>> stream.
>> 
>> Thanks,
>> 
>>> On 16 Aug 2017, at 12:55, Kostas Kloudas <k.klou...@data-artisans.com> 
>>> wrote:
>>> 
>>> In the second link for the BucketingSink, you can set your 
>>> own Bucketer using the setBucketer method. You do not have to 
>>> implement your own sink from scratch.
>>> 
>>> Kostas
>>> 
>>>> On Aug 16, 2017, at 1:39 PM, ant burton <apburto...@gmail.com> wrote:
>>>> 
>>>> or rather 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>>> 
>>>> 
>>>>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com> 
>>>>> wrote:
>>>>> 
>>>>> Hi Ant,
>>>>> 
>>>>> I think you can do it by implementing your own Bucketer.
>>>>> 
>>>>> Cheers,
>>>>> Kostas
>>>>> 
>>>>> .
>>>>>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com> wrote:
>>>>>> 
>>>>>> Hello,
>>>>>> 
>>>>>> Given 
>>>>>> 
>>>>>>      // Set StreamExecutionEnvironment
>>>>>>      final StreamExecutionEnvironment env = 
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> 
>>>>>>      // Set checkpoints in ms
>>>>>>      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>> 
>>>>>>      // Add source (input stream)
>>>>>>      DataStream<String> dataStream = StreamUtil.getDataStream(env, 
>>>>>> params);
>>>>>> 
>>>>>> How can I construct the s3_filename from the content of the an event, it 
>>>>>> seems that whenever I attempt this I either have access to an event or 
>>>>>> access to .addSink but not both.
>>>>>> 
>>>>>>  dataStream.addSink(new BucketingSink<String>("s3a://flink/" + 
>>>>>> s3_filename));
>>>>>> 
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to