Sorry realized this came off the user list by mistake. Adding the thread back in.
On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <rosh...@gmail.com> wrote: > Yes sorry, no errors on the task manager. However, I am new to flink so > don't know all the places to look for the logs. Been looking at the task > manager logs and don't see any exceptions there. Not sure where to look for > s3 exceptions in particular. > > On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <kklou...@gmail.com> wrote: > >> Yes, this is why I reached out for further information. >> >> Incrementing the part counter is the responsibility of the >> StreamingFileSink, whose code is FS-agnostic, so it should also fail >> in the local FS. >> Now if it is on the S3 side, it would help if you have any more info, >> for example any logs from S3, to see if anything went wrong on their >> end. >> >> So your logs refer to normal execution, i.e. no failures and no >> restarting, right? >> >> Cheers, >> Kostas >> >> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <rosh...@gmail.com> >> wrote: >> > >> > Surprisingly the same code running against the local filesystem works >> perfectly. The part counter increments correctly. >> > >> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <kklou...@gmail.com> wrote: >> >> >> >> Hi Roshan, >> >> >> >> Your logs refer to a simple run without any failures or re-running >> >> from a savepoint, right? >> >> >> >> I am asking because I am trying to reproduce it by running a modified >> >> ParquetStreamingFileSinkITCase [1] and so far I cannot. >> >> The ITCase runs against the local filesystem, and not S3, but I added >> >> the OutputFileConfig and it seems that the part counter is increases >> >> as expected. >> >> >> >> Is there any other information that would help us reproduce the issue? >> >> >> >> Cheers, >> >> Kostas >> >> >> >> [1] >> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java >> >> >> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com> >> wrote: >> >> > >> >> > Hi, >> >> > >> >> > I am trying to get the parquet writer to write to s3; however, the >> files do not seem to be rolling over. The same file "part-0-0.parquet" is >> being created each time. Like the 'partCounter" is not being updated? Maybe >> the Bucket is being recreated each time? I don't really know... Here are >> some logs: >> >> > >> >> > 2020-04-09 01:28:10,350 INFO >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >> 0 checkpointing for checkpoint with id=2 (max part counter=2). >> >> > 2020-04-09 01:28:10,589 INFO >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >> 0 received completion notification for checkpoint with id=2. >> >> > 2020-04-09 01:28:10,589 INFO >> org.apache.flink.fs.s3.common.writer.S3Committer - Committing >> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID >> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >> >> > 2020-04-09 01:29:10,350 INFO >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >> 0 checkpointing for checkpoint with id=3 (max part counter=3). >> >> > 2020-04-09 01:29:10,520 INFO >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >> 0 received completion notification for checkpoint with id=3. >> >> > 2020-04-09 01:29:10,521 INFO >> org.apache.flink.fs.s3.common.writer.S3Committer - Committing >> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID >> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >> >> > And a part of my code: >> >> > >> >> > ``` >> >> > >> >> > StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> > >> >> > // env.setParallelism(2); >> >> > env.enableCheckpointing(60000L); >> >> > ///PROPERTIES Added >> >> > Schema schema = bro_conn.getClassSchema(); >> >> > >> >> > OutputFileConfig config = OutputFileConfig >> >> > .builder() >> >> > .withPartSuffix(".parquet") >> >> > .build(); >> >> > >> >> > final StreamingFileSink<GenericRecord> sink = >> StreamingFileSink >> >> > .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), >> ParquetAvroWriters.forGenericRecord(schema)) >> >> > // >> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >> >> > .withOutputFileConfig(config) >> >> > // .withBucketAssigner(new >> PartitioningBucketAssigner()) >> >> > .build(); >> >> > >> >> > DataStream<String> kinesis = env.addSource(new >> FlinkKinesisConsumer<>( >> >> > "kinesis", new SimpleStringSchema(), >> consumerConfig)); >> >> > >> >> > kinesis.flatMap(new JsonAvroParser()) >> >> > .addSink(sink); >> >> > >> >> > >> >> > env.execute("Bro Conn"); >> >> > >> >> > ``` >> >> > >> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a >> custom image to add the presto/hadoop plugin. >> >> > >> >> > Thanks again! >> >