Nope just the s3a. I'll keep looking around to see if there is anything else I can see. If you think of anything else to try, let me know.
On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <kklou...@gmail.com> wrote: > It should not be a problem because from what you posted, you are using > "s3a" as the scheme for s3. > Are you using "s3p" for Presto? This should also be done in order for > Flink to understand where to use the one or the other. > > On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <rosh...@gmail.com> wrote: > > > > Lastly, could it be the way I built the flink image for kube? I added > both the presto and Hadoop plugins > > > > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <rosh...@gmail.com> wrote: > >> > >> Sorry realized this came off the user list by mistake. Adding the > thread back in. > >> > >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <rosh...@gmail.com> wrote: > >>> > >>> Yes sorry, no errors on the task manager. However, I am new to flink > so don't know all the places to look for the logs. Been looking at the task > manager logs and don't see any exceptions there. Not sure where to look for > s3 exceptions in particular. > >>> > >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <kklou...@gmail.com> > wrote: > >>>> > >>>> Yes, this is why I reached out for further information. > >>>> > >>>> Incrementing the part counter is the responsibility of the > >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail > >>>> in the local FS. > >>>> Now if it is on the S3 side, it would help if you have any more info, > >>>> for example any logs from S3, to see if anything went wrong on their > >>>> end. > >>>> > >>>> So your logs refer to normal execution, i.e. no failures and no > >>>> restarting, right? > >>>> > >>>> Cheers, > >>>> Kostas > >>>> > >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <rosh...@gmail.com> > wrote: > >>>> > > >>>> > Surprisingly the same code running against the local filesystem > works perfectly. The part counter increments correctly. > >>>> > > >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <kklou...@gmail.com> > wrote: > >>>> >> > >>>> >> Hi Roshan, > >>>> >> > >>>> >> Your logs refer to a simple run without any failures or re-running > >>>> >> from a savepoint, right? > >>>> >> > >>>> >> I am asking because I am trying to reproduce it by running a > modified > >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot. > >>>> >> The ITCase runs against the local filesystem, and not S3, but I > added > >>>> >> the OutputFileConfig and it seems that the part counter is > increases > >>>> >> as expected. > >>>> >> > >>>> >> Is there any other information that would help us reproduce the > issue? > >>>> >> > >>>> >> Cheers, > >>>> >> Kostas > >>>> >> > >>>> >> [1] > https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java > >>>> >> > >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com> > wrote: > >>>> >> > > >>>> >> > Hi, > >>>> >> > > >>>> >> > I am trying to get the parquet writer to write to s3; however, > the files do not seem to be rolling over. The same file "part-0-0.parquet" > is being created each time. Like the 'partCounter" is not being updated? > Maybe the Bucket is being recreated each time? I don't really know... Here > are some logs: > >>>> >> > > >>>> >> > 2020-04-09 01:28:10,350 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing for checkpoint with id=2 (max part counter=2). > >>>> >> > 2020-04-09 01:28:10,589 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 received completion notification for checkpoint with id=2. > >>>> >> > 2020-04-09 01:28:10,589 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI > >>>> >> > 2020-04-09 01:29:10,350 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing for checkpoint with id=3 (max part counter=3). > >>>> >> > 2020-04-09 01:29:10,520 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 received completion notification for checkpoint with id=3. > >>>> >> > 2020-04-09 01:29:10,521 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI > >>>> >> > And a part of my code: > >>>> >> > > >>>> >> > ``` > >>>> >> > > >>>> >> > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > >>>> >> > > >>>> >> > // env.setParallelism(2); > >>>> >> > env.enableCheckpointing(60000L); > >>>> >> > ///PROPERTIES Added > >>>> >> > Schema schema = bro_conn.getClassSchema(); > >>>> >> > > >>>> >> > OutputFileConfig config = OutputFileConfig > >>>> >> > .builder() > >>>> >> > .withPartSuffix(".parquet") > >>>> >> > .build(); > >>>> >> > > >>>> >> > final StreamingFileSink<GenericRecord> sink = > StreamingFileSink > >>>> >> > .forBulkFormat(new > Path("s3a://<bucket>/bro_conn/"), > ParquetAvroWriters.forGenericRecord(schema)) > >>>> >> > // > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > >>>> >> > .withOutputFileConfig(config) > >>>> >> > // .withBucketAssigner(new > PartitioningBucketAssigner()) > >>>> >> > .build(); > >>>> >> > > >>>> >> > DataStream<String> kinesis = env.addSource(new > FlinkKinesisConsumer<>( > >>>> >> > "kinesis", new SimpleStringSchema(), > consumerConfig)); > >>>> >> > > >>>> >> > kinesis.flatMap(new JsonAvroParser()) > >>>> >> > .addSink(sink); > >>>> >> > > >>>> >> > > >>>> >> > env.execute("Bro Conn"); > >>>> >> > > >>>> >> > ``` > >>>> >> > > >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also > created a custom image to add the presto/hadoop plugin. > >>>> >> > > >>>> >> > Thanks again! >