Hi,

I am reading messages off a Kafka Topic and want to process the messages
through Flink and save them into S3. It was pointed out to me that stream
processing of the Kafka data won't be saved to S3 because S3 doesn't allow
data to be appended to a file, so I want to convert the Kafka stream into
batches and save them to S3. Based on other user questions/answers, it
looks like this is possible using windowing by breaking the stream into
batches and creating files. I have written the following code, but it
doesn't work and I am not getting any errors either. I have a sys.out that
shows the tuple is being processed, but it might not be emitted in the
out.collect. Can someone help me figure out what may be the issue? Thanks!

public class S3Sink {

    public static void main(String[] args) throws Exception {

        Map<String, String> configs =
ConfigUtils.loadConfigs("/Users/path/to/configs.yaml");



        final ParameterTool parameterTool = ParameterTool.fromMap(configs);



StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().disableSysoutLogging();

        env.getConfig().setGlobalJobParameters(parameterTool);



        DataStream messageStream = env

                .addSource(new
FlinkKafkaConsumer09<String>(parameterTool.get("kafka.topic"),

                        new SimpleStringSchema(),

                        parameterTool.getProperties()));



        String uuid = UUID.randomUUID().toString();



        DataStreamSink tuple2DataStream = messageStream

                .flatMap(new Tupler())

                .keyBy(0)

              .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

                .apply(new MyWindowFunction())

                .writeAsText("s3://flink-test/flink-output-stream/"+ uuid +
"testdoc.txt");


        env.execute();

    }



    private static class Tupler implements FlatMapFunction<String,
Tuple2<String, String>> {

        @Override

        public void flatMap(String record, Collector<Tuple2<String,
String>> out) throws Exception {

        out.collect(new Tuple2<String, String>("record",record));

        }

    }



    private static class MyWindowFunction implements
WindowFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple,
TimeWindow>{



        @Override

        public void apply(Tuple key, TimeWindow timeWindow,
Iterable<Tuple2<String, String>> input,

                          Collector<Tuple2<String, String>> out) throws
Exception {

            for (Tuple2<String, String> in: input){

                System.out.println(in);

                out.collect(in);

            }

        }

    }

}

-- 

Thanks,

Sam

Reply via email to