Hi Roshan, Your logs refer to a simple run without any failures or re-running from a savepoint, right?
I am asking because I am trying to reproduce it by running a modified ParquetStreamingFileSinkITCase [1] and so far I cannot. The ITCase runs against the local filesystem, and not S3, but I added the OutputFileConfig and it seems that the part counter is increases as expected. Is there any other information that would help us reproduce the issue? Cheers, Kostas [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com> wrote: > > Hi, > > I am trying to get the parquet writer to write to s3; however, the files do > not seem to be rolling over. The same file "part-0-0.parquet" is being > created each time. Like the 'partCounter" is not being updated? Maybe the > Bucket is being recreated each time? I don't really know... Here are some > logs: > > 2020-04-09 01:28:10,350 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 > checkpointing for checkpoint with id=2 (max part counter=2). > 2020-04-09 01:28:10,589 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 > received completion notification for checkpoint with id=2. > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer > - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI > 2020-04-09 01:29:10,350 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 > checkpointing for checkpoint with id=3 (max part counter=3). > 2020-04-09 01:29:10,520 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 > received completion notification for checkpoint with id=3. > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer > - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI > And a part of my code: > > ``` > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > // env.setParallelism(2); > env.enableCheckpointing(60000L); > ///PROPERTIES Added > Schema schema = bro_conn.getClassSchema(); > > OutputFileConfig config = OutputFileConfig > .builder() > .withPartSuffix(".parquet") > .build(); > > final StreamingFileSink<GenericRecord> sink = StreamingFileSink > .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), > ParquetAvroWriters.forGenericRecord(schema)) > // .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .withOutputFileConfig(config) > // .withBucketAssigner(new PartitioningBucketAssigner()) > .build(); > > DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( > "kinesis", new SimpleStringSchema(), consumerConfig)); > > kinesis.flatMap(new JsonAvroParser()) > .addSink(sink); > > > env.execute("Bro Conn"); > > ``` > > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom > image to add the presto/hadoop plugin. > > Thanks again!