
I am trying to get the parquet writer to write to s3; however, the files do
not seem to be rolling over. The same file "part-0-0.parquet" is being
created each time. Like the 'partCounter" is not being updated? Maybe the
Bucket is being recreated each time? I don't really know... Here are some

2020-04-09 01:28:10,350 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=2).
2020-04-09 01:28:10,589 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
received completion notification for checkpoint with id=2.
2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.
S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU
ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.
2020-04-09 01:29:10,350 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=3).
2020-04-09 01:29:10,520 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
received completion notification for checkpoint with id=3.
2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.
S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU
ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.
And a part of my code:


StreamExecutionEnvironment env =

//        env.setParallelism(2);
        Schema schema = bro_conn.getClassSchema();

        OutputFileConfig config = OutputFileConfig

        final StreamingFileSink<GenericRecord> sink = StreamingFileSink
                .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"),
//                .withRollingPolicy(OnCheckpointRollingPolicy.build())
//                .withBucketAssigner(new PartitioningBucketAssigner())

        DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
                "kinesis", new SimpleStringSchema(), consumerConfig));

        kinesis.flatMap(new JsonAvroParser())

        env.execute("Bro Conn");


I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom
image to add the presto/hadoop plugin.

Thanks again!

Reply via email to