As Magnus mentioned, FileSystems are picked up from the class path and registered here. https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L480
Seems like Flink is invoking this method at following locations. https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L142 https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63 I'm not too familiar about Flink sure why S3 is not properly being registered when running the Flink job. Ccing some folks who are more familiar about Flink. +Ankur Goenka <goe...@google.com> +Maximilian Michels <m...@apache.org> Thanks, Cham On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake < preston.b.kopriv...@cerner.com> wrote: > Thanks for the reply Magnus. > > I'm sorry it wasn't more clear in the original message. I have added the > aws dependencies and set up the pipeline options with the aws options. > For the case where I set the write to ignore windowing, everything works. > But the option is deprecated and the comments warn against its usage. > > I'm wondering if where no options are set and I see the error that that is > a case of improperly initialized filesystems in the flink runner. Or > maybe someone has some different ideas for the culprit. > > Get Outlook for Android <https://aka.ms/ghei36> > > ------------------------------ > *From:* Magnus Runesson <ma...@linuxalert.org> > *Sent:* Saturday, September 21, 2019 9:06:03 AM > *To:* user@beam.apache.org <user@beam.apache.org> > *Subject:* Re: No filesystem found for scheme s3 using FileIO > > > Hi! > > > You probably miss the S3 filesystem in your classpath. > > If I remember correctly you must include this > https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services > <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=ahe8GyTjSswDDCXoOimmVx5u%2FckkbGY4gTS4ZVSRv8Q%3D&reserved=0> > package in your classpath/fat-jar. > > /Magnus > On 2019-09-19 23:13, Koprivica,Preston Blake wrote: > > Hello everyone. I’m getting the following error when attempting to use the > FileIO apis (beam-2.15.0) and integrating with a 3rd party filesystem, in > this case AWS S3: > > > > java.lang.IllegalArgumentException: No filesystem found for scheme s3 > > at > org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) > > at > org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) > > at > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149) > > at > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105) > > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > > at > org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83) > > at > org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) > > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93) > > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) > > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) > > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:748) > > > > For reference, the write code resembles this: > > > > FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write() > > .via(ParquetIO.sink(schema)) > > .to(options.getOutputDir()). // will be something like: > s3://<bucket>/<path> > > .withSuffix(".parquet"); > > > > records.apply(String.format("Write(%s)", options.getOutputDir()), write); > > > > I have setup the PipelineOptions with all the relevant AWS options and the > issue does not appear to be related to ParquetIO.sink() directly. I am > able to reliably reproduce the issue using JSON formatted records and > TextIO.sink(), as well. > > > > Just trying some different knobs, I went ahead and set the following > option: > > > > write = write.withNoSpilling(); > > > > This actually seemed to fix the issue, only to have it reemerge as I > scaled up the data set size. The stack trace, while very similar, reads: > > > > java.lang.IllegalArgumentException: No filesystem found for scheme s3 > > at > org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) > > at > org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) > > at > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149) > > at > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105) > > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) > > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) > > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93) > > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) > > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) > > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > > at > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:748) > > > > I’ll be interested to hear some theories on the differences/similarities > in the stacks. And lastly, I tried adding the following deprecated option > (with and without the withNoSpilling() option): > > > > write = write.withIgnoreWindowing(); > > > > This seemed to fix the issue altogether but aside from having to rely on a > deprecated feature, there is the bigger issue of why? > > > > In reading through some of the source, it seems a common pattern to have > to manually register the pipeline options to seed the filesystem registry > during the setup part of the operator lifecycle, e.g.: > https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313 > <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0> > > > > > Is it possible that I have hit upon a couple scenarios where that has not > taken place? Unfortunately, I’m not yet at a position to suggest a fix, > but I’m guessing there’s some missing initialization code in one or more of > the batch operators. If this is indeed a legitimate issue, I’ll be happy > to log an issue, but I’ll hold off until the community gets a chance to > look at it. > > > > Thanks, > > - Preston > > > > CONFIDENTIALITY NOTICE This message and any included attachments are from > Cerner Corporation and are intended only for the addressee. The information > contained in this message is confidential and may constitute inside or > non-public information under international, federal, or state securities > laws. Unauthorized forwarding, printing, copying, distribution, or use of > such information is strictly prohibited and may be unlawful. If you are not > the addressee, please promptly delete this message and notify the sender of > the delivery error by e-mail or you may call Cerner's corporate offices in > Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>. > >