I would say so, yes. Also could you set the paths where you want to use Presto to "s3p", as described in [1], just to be sure that there is not ambiguity.
You could also make use of [2]. And thanks for looking into it! Cheers, Kostas [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#s3-specific [2] https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters On Thu, Apr 9, 2020 at 2:50 PM Roshan Punnoose <rosh...@gmail.com> wrote: > > Btw, I ran the same exact code on a local Flink cluster run with > `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the > part files do not roll over; however, with the local filesystem it works > perfectly. Should I be looking at the S3Committer in Flink to see if there is > something odd going on? > > On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose <rosh...@gmail.com> wrote: >> >> Nope just the s3a. I'll keep looking around to see if there is anything else >> I can see. If you think of anything else to try, let me know. >> >> On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <kklou...@gmail.com> wrote: >>> >>> It should not be a problem because from what you posted, you are using >>> "s3a" as the scheme for s3. >>> Are you using "s3p" for Presto? This should also be done in order for >>> Flink to understand where to use the one or the other. >>> >>> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <rosh...@gmail.com> wrote: >>> > >>> > Lastly, could it be the way I built the flink image for kube? I added >>> > both the presto and Hadoop plugins >>> > >>> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <rosh...@gmail.com> wrote: >>> >> >>> >> Sorry realized this came off the user list by mistake. Adding the thread >>> >> back in. >>> >> >>> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <rosh...@gmail.com> wrote: >>> >>> >>> >>> Yes sorry, no errors on the task manager. However, I am new to flink so >>> >>> don't know all the places to look for the logs. Been looking at the >>> >>> task manager logs and don't see any exceptions there. Not sure where to >>> >>> look for s3 exceptions in particular. >>> >>> >>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <kklou...@gmail.com> wrote: >>> >>>> >>> >>>> Yes, this is why I reached out for further information. >>> >>>> >>> >>>> Incrementing the part counter is the responsibility of the >>> >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail >>> >>>> in the local FS. >>> >>>> Now if it is on the S3 side, it would help if you have any more info, >>> >>>> for example any logs from S3, to see if anything went wrong on their >>> >>>> end. >>> >>>> >>> >>>> So your logs refer to normal execution, i.e. no failures and no >>> >>>> restarting, right? >>> >>>> >>> >>>> Cheers, >>> >>>> Kostas >>> >>>> >>> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <rosh...@gmail.com> >>> >>>> wrote: >>> >>>> > >>> >>>> > Surprisingly the same code running against the local filesystem >>> >>>> > works perfectly. The part counter increments correctly. >>> >>>> > >>> >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <kklou...@gmail.com> >>> >>>> > wrote: >>> >>>> >> >>> >>>> >> Hi Roshan, >>> >>>> >> >>> >>>> >> Your logs refer to a simple run without any failures or re-running >>> >>>> >> from a savepoint, right? >>> >>>> >> >>> >>>> >> I am asking because I am trying to reproduce it by running a >>> >>>> >> modified >>> >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot. >>> >>>> >> The ITCase runs against the local filesystem, and not S3, but I >>> >>>> >> added >>> >>>> >> the OutputFileConfig and it seems that the part counter is increases >>> >>>> >> as expected. >>> >>>> >> >>> >>>> >> Is there any other information that would help us reproduce the >>> >>>> >> issue? >>> >>>> >> >>> >>>> >> Cheers, >>> >>>> >> Kostas >>> >>>> >> >>> >>>> >> [1] >>> >>>> >> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java >>> >>>> >> >>> >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com> >>> >>>> >> wrote: >>> >>>> >> > >>> >>>> >> > Hi, >>> >>>> >> > >>> >>>> >> > I am trying to get the parquet writer to write to s3; however, >>> >>>> >> > the files do not seem to be rolling over. The same file >>> >>>> >> > "part-0-0.parquet" is being created each time. Like the >>> >>>> >> > 'partCounter" is not being updated? Maybe the Bucket is being >>> >>>> >> > recreated each time? I don't really know... Here are some logs: >>> >>>> >> > >>> >>>> >> > 2020-04-09 01:28:10,350 INFO >>> >>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets >>> >>>> >> > - Subtask 0 checkpointing for checkpoint with id=2 (max part >>> >>>> >> > counter=2). >>> >>>> >> > 2020-04-09 01:28:10,589 INFO >>> >>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets >>> >>>> >> > - Subtask 0 received completion notification for checkpoint with >>> >>>> >> > id=2. >>> >>>> >> > 2020-04-09 01:28:10,589 INFO >>> >>>> >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing >>> >>>> >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID >>> >>>> >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >>> >>>> >> > 2020-04-09 01:29:10,350 INFO >>> >>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets >>> >>>> >> > - Subtask 0 checkpointing for checkpoint with id=3 (max part >>> >>>> >> > counter=3). >>> >>>> >> > 2020-04-09 01:29:10,520 INFO >>> >>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets >>> >>>> >> > - Subtask 0 received completion notification for checkpoint with >>> >>>> >> > id=3. >>> >>>> >> > 2020-04-09 01:29:10,521 INFO >>> >>>> >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing >>> >>>> >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID >>> >>>> >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >>> >>>> >> > And a part of my code: >>> >>>> >> > >>> >>>> >> > ``` >>> >>>> >> > >>> >>>> >> > StreamExecutionEnvironment env = >>> >>>> >> > StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>>> >> > >>> >>>> >> > // env.setParallelism(2); >>> >>>> >> > env.enableCheckpointing(60000L); >>> >>>> >> > ///PROPERTIES Added >>> >>>> >> > Schema schema = bro_conn.getClassSchema(); >>> >>>> >> > >>> >>>> >> > OutputFileConfig config = OutputFileConfig >>> >>>> >> > .builder() >>> >>>> >> > .withPartSuffix(".parquet") >>> >>>> >> > .build(); >>> >>>> >> > >>> >>>> >> > final StreamingFileSink<GenericRecord> sink = >>> >>>> >> > StreamingFileSink >>> >>>> >> > .forBulkFormat(new >>> >>>> >> > Path("s3a://<bucket>/bro_conn/"), >>> >>>> >> > ParquetAvroWriters.forGenericRecord(schema)) >>> >>>> >> > // >>> >>>> >> > .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>> >>>> >> > .withOutputFileConfig(config) >>> >>>> >> > // .withBucketAssigner(new >>> >>>> >> > PartitioningBucketAssigner()) >>> >>>> >> > .build(); >>> >>>> >> > >>> >>>> >> > DataStream<String> kinesis = env.addSource(new >>> >>>> >> > FlinkKinesisConsumer<>( >>> >>>> >> > "kinesis", new SimpleStringSchema(), >>> >>>> >> > consumerConfig)); >>> >>>> >> > >>> >>>> >> > kinesis.flatMap(new JsonAvroParser()) >>> >>>> >> > .addSink(sink); >>> >>>> >> > >>> >>>> >> > >>> >>>> >> > env.execute("Bro Conn"); >>> >>>> >> > >>> >>>> >> > ``` >>> >>>> >> > >>> >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created >>> >>>> >> > a custom image to add the presto/hadoop plugin. >>> >>>> >> > >>> >>>> >> > Thanks again!