Thanks Kostas,

I’m narrowing in on a solution:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html>
 says "You can also specify a custom bucketer by using setBucketer() on a 
BucketingSink. If desired, the bucketer can use a property of the element or 
tuple to determine the bucket directory.”

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
Therefore I’ve created a skeleton class:

public class S3Bucketer implements Bucketer {
        private static final long serialVersionUID = 1L;

        private final String formatString;

        public S3Bucketer() {
        }

        private void readObject(ObjectInputStream in) {
                in.defaultReadObject();
        }

        public boolean shouldStartNewBucket(Path basePath, Path 
currentBucketPath) {
                return true;
        }

        public Path getNextBucketPath(Path basePath) {
                return new Path(basePath + 
“/some-path-that-I-need-create-from-the-stream");
        }
}

my question now is how do I access the data stream from within the S3Bucketer 
so that I can generate a filename based on the data with the data stream.

Thanks,

> On 16 Aug 2017, at 12:55, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> 
> In the second link for the BucketingSink, you can set your 
> own Bucketer using the setBucketer method. You do not have to 
> implement your own sink from scratch.
> 
> Kostas
> 
>> On Aug 16, 2017, at 1:39 PM, ant burton <apburto...@gmail.com 
>> <mailto:apburto...@gmail.com>> wrote:
>> 
>> or rather 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
>> 
>> 
>>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com 
>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>> 
>>> Hi Ant,
>>> 
>>> I think you can do it by implementing your own Bucketer.
>>> 
>>> Cheers,
>>> Kostas
>>> 
>>> .
>>>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com 
>>>> <mailto:apburto...@gmail.com>> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> Given 
>>>> 
>>>>       // Set StreamExecutionEnvironment
>>>>       final StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> 
>>>>       // Set checkpoints in ms
>>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> 
>>>>       // Add source (input stream)
>>>>       DataStream<String> dataStream = StreamUtil.getDataStream(env, 
>>>> params);
>>>> 
>>>> How can I construct the s3_filename from the content of the an event, it 
>>>> seems that whenever I attempt this I either have access to an event or 
>>>> access to .addSink but not both.
>>>> 
>>>>    dataStream.addSink(new BucketingSink<String>("s3a://flink/ 
>>>> <s3a://flink/>" + s3_filename));
>>>> 
>>>> 
>>>> Thanks,
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to