I am I on the right path with the following: class S3SinkFunc implements SinkFunction<String> { public void invoke(String element) { System.out.println(element); // don't have access to dataStream to call .addSink() :-( } }
Thanks, > On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > > Hi Ant, > > I think you can do it by implementing your own Bucketer. > > Cheers, > Kostas > > . >> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com> wrote: >> >> Hello, >> >> Given >> >> // Set StreamExecutionEnvironment >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> // Set checkpoints in ms >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> // Add source (input stream) >> DataStream<String> dataStream = StreamUtil.getDataStream(env, params); >> >> How can I construct the s3_filename from the content of the an event, it >> seems that whenever I attempt this I either have access to an event or >> access to .addSink but not both. >> >> dataStream.addSink(new BucketingSink<String>("s3a://flink/" + >> s3_filename)); >> >> >> Thanks, >> >> >> >> >