As Magnus mentioned, FileSystems are picked up from the class path and
registered here.
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L480

Seems like Flink is invoking this method at following locations.
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L142
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63

I'm not too familiar about Flink sure why S3 is not properly being
registered when running the Flink job. Ccing some folks who are more
familiar about Flink.

+Ankur Goenka <goe...@google.com> +Maximilian Michels <m...@apache.org>

Thanks,
Cham


On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake <
preston.b.kopriv...@cerner.com> wrote:

> Thanks for the reply Magnus.
>
> I'm sorry it wasn't more clear in the original message.  I have added the
> aws dependencies and set up the pipeline options with the aws options.
> For the case where I set the write to ignore windowing, everything works.
> But the option is deprecated and the comments warn against its usage.
>
> I'm wondering if where no options are set and I see the error that that is
> a case of improperly initialized filesystems in the flink runner.   Or
> maybe someone has some different ideas for the culprit.
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> ------------------------------
> *From:* Magnus Runesson <ma...@linuxalert.org>
> *Sent:* Saturday, September 21, 2019 9:06:03 AM
> *To:* user@beam.apache.org <user@beam.apache.org>
> *Subject:* Re: No filesystem found for scheme s3 using FileIO
>
>
> Hi!
>
>
> You probably miss the S3 filesystem in your classpath.
>
> If I remember correctly you must include this
> https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services
> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=ahe8GyTjSswDDCXoOimmVx5u%2FckkbGY4gTS4ZVSRv8Q%3D&reserved=0>
> package in your classpath/fat-jar.
>
> /Magnus
> On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>
> Hello everyone. I’m getting the following error when attempting to use the
> FileIO apis (beam-2.15.0) and integrating with a 3rd party filesystem, in
> this case AWS S3:
>
>
>
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
>
>     at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>
>     at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>
>     at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>
>     at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>
>     at
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>
>     at
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>
>     at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>
>     at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>
>     at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>
>     at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
>     at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>
>     at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>
>     at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>
>     at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
>
> For reference, the write code resembles this:
>
>
>
> FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
>
>                 .via(ParquetIO.sink(schema))
>
>                 .to(options.getOutputDir()). // will be something like:
> s3://<bucket>/<path>
>
>                 .withSuffix(".parquet");
>
>
>
> records.apply(String.format("Write(%s)", options.getOutputDir()), write);
>
>
>
> I have setup the PipelineOptions with all the relevant AWS options and the
> issue does not appear to be related to ParquetIO.sink() directly.  I am
> able to reliably reproduce the issue using JSON formatted records and
> TextIO.sink(), as well.
>
>
>
> Just trying some different knobs, I went ahead and set the following
> option:
>
>
>
>         write = write.withNoSpilling();
>
>
>
> This actually seemed to fix the issue, only to have it reemerge as I
> scaled up the data set size.  The stack trace, while very similar, reads:
>
>
>
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
>
>     at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>
>     at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>
>     at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>
>     at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>
>     at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>
>     at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>
>     at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>
>     at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
>     at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>
>     at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>
>     at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
>
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>
>     at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
>
> I’ll be interested to hear some theories on the differences/similarities
> in the stacks.  And lastly, I tried adding the following deprecated option
> (with and without the withNoSpilling() option):
>
>
>
> write = write.withIgnoreWindowing();
>
>
>
> This seemed to fix the issue altogether but aside from having to rely on a
> deprecated feature, there is the bigger issue of why?
>
>
>
> In reading through some of the source, it seems a common pattern to have
> to manually register the pipeline options to seed the filesystem registry
> during the setup part of the operator lifecycle, e.g.:
> https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313
> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0>
>
>
>
>
> Is it possible that I have hit upon a couple scenarios where that has not
> taken place?  Unfortunately, I’m not yet at a position to suggest a fix,
> but I’m guessing there’s some missing initialization code in one or more of
> the batch operators.  If this is indeed a legitimate issue, I’ll be happy
> to log an issue, but I’ll hold off until the community gets a chance to
> look at it.
>
>
>
> Thanks,
>
>    - Preston
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>.
>
>

Reply via email to