Hi!
You probably miss the S3 filesystem in your classpath.
If I remember correctly you must include this
https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services
package in your classpath/fat-jar.
/Magnus
On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
Hello everyone. I’m getting the following error when attempting to use
the FileIO apis (beam-2.15.0) and integrating with a 3rd party
filesystem, in this case AWS S3:
java.lang.IllegalArgumentException: No filesystem found for scheme s3
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
at
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
For reference, the write code resembles this:
FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema))
.to(options.getOutputDir()). // will be something like:
s3://<bucket>/<path>
.withSuffix(".parquet");
records.apply(String.format("Write(%s)", options.getOutputDir()), write);
I have setup the PipelineOptions with all the relevant AWS options and
the issue does not appear to be related to ParquetIO.sink() directly.
I am able to reliably reproduce the issue using JSON formatted records
and TextIO.sink(), as well.
Just trying some different knobs, I went ahead and set the following
option:
write = write.withNoSpilling();
This actually seemed to fix the issue, only to have it reemerge as I
scaled up the data set size. The stack trace, while very similar, reads:
java.lang.IllegalArgumentException: No filesystem found for scheme s3
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
I’ll be interested to hear some theories on the
differences/similarities in the stacks. And lastly, I tried adding
the following deprecated option (with and without the withNoSpilling()
option):
write = write.withIgnoreWindowing();
This seemed to fix the issue altogether but aside from having to rely
on a deprecated feature, there is the bigger issue of why?
In reading through some of the source, it seems a common pattern to
have to manually register the pipeline options to seed the filesystem
registry during the setup part of the operator lifecycle, e.g.:
https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313
Is it possible that I have hit upon a couple scenarios where that has
not taken place? Unfortunately, I’m not yet at a position to suggest
a fix, but I’m guessing there’s some missing initialization code in
one or more of the batch operators. If this is indeed a legitimate
issue, I’ll be happy to log an issue, but I’ll hold off until the
community gets a chance to look at it.
Thanks,
* Preston
CONFIDENTIALITY NOTICE This message and any included attachments are
from Cerner Corporation and are intended only for the addressee. The
information contained in this message is confidential and may
constitute inside or non-public information under international,
federal, or state securities laws. Unauthorized forwarding, printing,
copying, distribution, or use of such information is strictly
prohibited and may be unlawful. If you are not the addressee, please
promptly delete this message and notify the sender of the delivery
error by e-mail or you may call Cerner's corporate offices in Kansas
City, Missouri, U.S.A at (+1) (816)221-1024.