Hi,

I am trying to get the parquet writer to write to s3; however, the files do
not seem to be rolling over. The same file "part-0-0.parquet" is being
created each time. Like the 'partCounter" is not being updated? Maybe the
Bucket is being recreated each time? I don't really know... Here are some
logs:

2020-04-09 01:28:10,350 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=2).
2020-04-09 01:28:10,589 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
received completion notification for checkpoint with id=2.
2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.
S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU
ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.
DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I
.o.h.wN_fXoBix6lwbotzZCI
2020-04-09 01:29:10,350 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=3).
2020-04-09 01:29:10,520 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
received completion notification for checkpoint with id=3.
2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.
S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU
ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.
DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I
.o.h.wN_fXoBix6lwbotzZCI
And a part of my code:

```

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

//        env.setParallelism(2);
        env.enableCheckpointing(60000L);
///PROPERTIES Added
        Schema schema = bro_conn.getClassSchema();

        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartSuffix(".parquet")
                .build();

        final StreamingFileSink<GenericRecord> sink = StreamingFileSink
                .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"),
ParquetAvroWriters.forGenericRecord(schema))
//                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(config)
//                .withBucketAssigner(new PartitioningBucketAssigner())
                .build();

        DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
                "kinesis", new SimpleStringSchema(), consumerConfig));

        kinesis.flatMap(new JsonAvroParser())
                .addSink(sink);


        env.execute("Bro Conn");

```

I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom
image to add the presto/hadoop plugin.

Thanks again!

Reply via email to