Hi Roshan,

Your logs refer to a simple run without any failures or re-running
from a savepoint, right?

I am asking because I am trying to reproduce it by running a modified
ParquetStreamingFileSinkITCase [1] and so far I cannot.
The ITCase runs against the local filesystem, and not S3, but I added
the OutputFileConfig and it seems that the part counter is increases
as expected.

Is there any other information that would help us reproduce the issue?



On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com> wrote:
> Hi,
> I am trying to get the parquet writer to write to s3; however, the files do 
> not seem to be rolling over. The same file "part-0-0.parquet" is being 
> created each time. Like the 'partCounter" is not being updated? Maybe the 
> Bucket is being recreated each time? I don't really know... Here are some 
> logs:
> 2020-04-09 01:28:10,350 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> checkpointing for checkpoint with id=2 (max part counter=2).
> 2020-04-09 01:28:10,589 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> received completion notification for checkpoint with id=2.
> 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer 
> - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> 2020-04-09 01:29:10,350 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> checkpointing for checkpoint with id=3 (max part counter=3).
> 2020-04-09 01:29:10,520 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> received completion notification for checkpoint with id=3.
> 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer 
> - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> And a part of my code:
> ```
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> //        env.setParallelism(2);
>         env.enableCheckpointing(60000L);
>         Schema schema = bro_conn.getClassSchema();
>         OutputFileConfig config = OutputFileConfig
>                 .builder()
>                 .withPartSuffix(".parquet")
>                 .build();
>         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), 
> ParquetAvroWriters.forGenericRecord(schema))
> //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>                 .withOutputFileConfig(config)
> //                .withBucketAssigner(new PartitioningBucketAssigner())
>                 .build();
>         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>                 "kinesis", new SimpleStringSchema(), consumerConfig));
>         kinesis.flatMap(new JsonAvroParser())
>                 .addSink(sink);
>         env.execute("Bro Conn");
> ```
> I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom 
> image to add the presto/hadoop plugin.
> Thanks again!

Reply via email to