Hi Roshan,

Your logs refer to a simple run without any failures or re-running
from a savepoint, right?

I am asking because I am trying to reproduce it by running a modified
ParquetStreamingFileSinkITCase [1] and so far I cannot.
The ITCase runs against the local filesystem, and not S3, but I added
the OutputFileConfig and it seems that the part counter is increases
as expected.

Is there any other information that would help us reproduce the issue?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java

On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com> wrote:
>
> Hi,
>
> I am trying to get the parquet writer to write to s3; however, the files do 
> not seem to be rolling over. The same file "part-0-0.parquet" is being 
> created each time. Like the 'partCounter" is not being updated? Maybe the 
> Bucket is being recreated each time? I don't really know... Here are some 
> logs:
>
> 2020-04-09 01:28:10,350 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> checkpointing for checkpoint with id=2 (max part counter=2).
> 2020-04-09 01:28:10,589 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> received completion notification for checkpoint with id=2.
> 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer 
> - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> 2020-04-09 01:29:10,350 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> checkpointing for checkpoint with id=3 (max part counter=3).
> 2020-04-09 01:29:10,520 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> received completion notification for checkpoint with id=3.
> 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer 
> - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> And a part of my code:
>
> ```
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> //        env.setParallelism(2);
>         env.enableCheckpointing(60000L);
> ///PROPERTIES Added
>         Schema schema = bro_conn.getClassSchema();
>
>         OutputFileConfig config = OutputFileConfig
>                 .builder()
>                 .withPartSuffix(".parquet")
>                 .build();
>
>         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), 
> ParquetAvroWriters.forGenericRecord(schema))
> //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>                 .withOutputFileConfig(config)
> //                .withBucketAssigner(new PartitioningBucketAssigner())
>                 .build();
>
>         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>                 "kinesis", new SimpleStringSchema(), consumerConfig));
>
>         kinesis.flatMap(new JsonAvroParser())
>                 .addSink(sink);
>
>
>         env.execute("Bro Conn");
>
> ```
>
> I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom 
> image to add the presto/hadoop plugin.
>
> Thanks again!

Reply via email to