Hi, I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:
2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2). 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2. 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer. S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC. DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I .o.h.wN_fXoBix6lwbotzZCI 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3). 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3. 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer. S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC. DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I .o.h.wN_fXoBix6lwbotzZCI And a part of my code: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(2); env.enableCheckpointing(60000L); ///PROPERTIES Added Schema schema = bro_conn.getClassSchema(); OutputFileConfig config = OutputFileConfig .builder() .withPartSuffix(".parquet") .build(); final StreamingFileSink<GenericRecord> sink = StreamingFileSink .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema)) // .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(config) // .withBucketAssigner(new PartitioningBucketAssigner()) .build(); DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( "kinesis", new SimpleStringSchema(), consumerConfig)); kinesis.flatMap(new JsonAvroParser()) .addSink(sink); env.execute("Bro Conn"); ``` I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin. Thanks again!