Sorry realized this came off the user list by mistake. Adding the thread
back in.

On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <rosh...@gmail.com> wrote:

> Yes sorry, no errors on the task manager. However, I am new to flink so
> don't know all the places to look for the logs. Been looking at the task
> manager logs and don't see any exceptions there. Not sure where to look for
> s3 exceptions in particular.
>
> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>
>> Yes, this is why I reached out for further information.
>>
>> Incrementing the part counter is the responsibility of the
>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>> in the local FS.
>> Now if it is on the S3 side, it would help if you have any more info,
>> for example any logs from S3, to see if anything went wrong on their
>> end.
>>
>> So your logs refer to normal execution, i.e. no failures and no
>> restarting, right?
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <rosh...@gmail.com>
>> wrote:
>> >
>> > Surprisingly the same code running against the local filesystem works
>> perfectly. The part counter increments correctly.
>> >
>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>> >>
>> >> Hi Roshan,
>> >>
>> >> Your logs refer to a simple run without any failures or re-running
>> >> from a savepoint, right?
>> >>
>> >> I am asking because I am trying to reproduce it by running a modified
>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>> >> The ITCase runs against the local filesystem, and not S3, but I added
>> >> the OutputFileConfig and it seems that the part counter is increases
>> >> as expected.
>> >>
>> >> Is there any other information that would help us reproduce the issue?
>> >>
>> >> Cheers,
>> >> Kostas
>> >>
>> >> [1]
>> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>> >>
>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com>
>> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I am trying to get the parquet writer to write to s3; however, the
>> files do not seem to be rolling over. The same file "part-0-0.parquet" is
>> being created each time. Like the 'partCounter" is not being updated? Maybe
>> the Bucket is being recreated each time? I don't really know... Here are
>> some logs:
>> >> >
>> >> > 2020-04-09 01:28:10,350 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 checkpointing for checkpoint with id=2 (max part counter=2).
>> >> > 2020-04-09 01:28:10,589 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 received completion notification for checkpoint with id=2.
>> >> > 2020-04-09 01:28:10,589 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> >> > 2020-04-09 01:29:10,350 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 checkpointing for checkpoint with id=3 (max part counter=3).
>> >> > 2020-04-09 01:29:10,520 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 received completion notification for checkpoint with id=3.
>> >> > 2020-04-09 01:29:10,521 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> >> > And a part of my code:
>> >> >
>> >> > ```
>> >> >
>> >> > StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >> >
>> >> > //        env.setParallelism(2);
>> >> >         env.enableCheckpointing(60000L);
>> >> > ///PROPERTIES Added
>> >> >         Schema schema = bro_conn.getClassSchema();
>> >> >
>> >> >         OutputFileConfig config = OutputFileConfig
>> >> >                 .builder()
>> >> >                 .withPartSuffix(".parquet")
>> >> >                 .build();
>> >> >
>> >> >         final StreamingFileSink<GenericRecord> sink =
>> StreamingFileSink
>> >> >                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"),
>> ParquetAvroWriters.forGenericRecord(schema))
>> >> > //
>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>> >> >                 .withOutputFileConfig(config)
>> >> > //                .withBucketAssigner(new
>> PartitioningBucketAssigner())
>> >> >                 .build();
>> >> >
>> >> >         DataStream<String> kinesis = env.addSource(new
>> FlinkKinesisConsumer<>(
>> >> >                 "kinesis", new SimpleStringSchema(),
>> consumerConfig));
>> >> >
>> >> >         kinesis.flatMap(new JsonAvroParser())
>> >> >                 .addSink(sink);
>> >> >
>> >> >
>> >> >         env.execute("Bro Conn");
>> >> >
>> >> > ```
>> >> >
>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a
>> custom image to add the presto/hadoop plugin.
>> >> >
>> >> > Thanks again!
>>
>

Reply via email to