Hi, I am reading messages off a Kafka Topic and want to process the messages through Flink and save them into S3. It was pointed out to me that stream processing of the Kafka data won't be saved to S3 because S3 doesn't allow data to be appended to a file, so I want to convert the Kafka stream into batches and save them to S3. Based on other user questions/answers, it looks like this is possible using windowing by breaking the stream into batches and creating files. I have written the following code, but it doesn't work and I am not getting any errors either. I have a sys.out that shows the tuple is being processed, but it might not be emitted in the out.collect. Can someone help me figure out what may be the issue? Thanks!
public class S3Sink { public static void main(String[] args) throws Exception { Map<String, String> configs = ConfigUtils.loadConfigs("/Users/path/to/configs.yaml"); final ParameterTool parameterTool = ParameterTool.fromMap(configs); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setGlobalJobParameters(parameterTool); DataStream messageStream = env .addSource(new FlinkKafkaConsumer09<String>(parameterTool.get("kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties())); String uuid = UUID.randomUUID().toString(); DataStreamSink tuple2DataStream = messageStream .flatMap(new Tupler()) .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .apply(new MyWindowFunction()) .writeAsText("s3://flink-test/flink-output-stream/"+ uuid + "testdoc.txt"); env.execute(); } private static class Tupler implements FlatMapFunction<String, Tuple2<String, String>> { @Override public void flatMap(String record, Collector<Tuple2<String, String>> out) throws Exception { out.collect(new Tuple2<String, String>("record",record)); } } private static class MyWindowFunction implements WindowFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple, TimeWindow>{ @Override public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, String>> input, Collector<Tuple2<String, String>> out) throws Exception { for (Tuple2<String, String> in: input){ System.out.println(in); out.collect(in); } } } } -- Thanks, Sam