Hi Beam Team, I have a data pipeline (Beam on Flink running on YARN on AWS EMR), which reads some data and does a simple filtering operation and writes the data to data source S3.
*Components and Versions:* - Beam: 2.16.0 (branch: release-2.16.0) - Flink: 1.8 - YARN on AWS EMR: emr-5.26.0 Below is a snippet of code PCollection<SomeType> someTypes = pipeline.apply(new ReadLatestSomeType()); PCollection<SomeTypeValue> someTypesOutput = someTypes.apply( Filter.by( someTypeElement -> { if (some condition) { return false; } return true; })); someTypesOutput .apply( AvroIO.write(SomeType.class).to(options.getDestination().get()).withOutputFilenames()) .getPerDestinationOutputFilenames(); Below is the exception, I see on Flink job java.lang.IllegalArgumentException: No filesystem found for scheme s3 at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533) at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:106) My Beam pipeline is failing randomly with the exception I have listed above. I saw that the fix was made available on the branch: release-2.16.0 (comment on JIRA: https://issues.apache.org/jira/browse/BEAM-8303) which I have checked out locally. I did a custom build of the Apache/Beam (branch: release-2.16.0) and made sure the artifacts were published to the local maven repository so that I can use the artifacts on my project. I imported Beam artifacts (2.16.0-MG-SNAPSHOT) in my project and build my project. Once I had the compiled JAR, I kicked off Flink jobs on an AWS EMR cluster. The Flink job will give fail/success randomly (non-deterministic), without any change in the way I run the command, or re-building my artifact. I have noticed if I run the Flink Job with the parallelism of 1, it will not fail, but if I run the same job with the parallelism of 5 it can fail or succeed. If anyone can please help me out or give me directions, I could try out, it will be greatly appreciated. Thanks. - Maulik On Wed, Sep 25, 2019 at 10:53 AM Koprivica,Preston Blake < preston.b.kopriv...@cerner.com> wrote: > Not a problem! Thanks for looking into this. In reading through the > source associated with the stacktrace, I also noticed that there's neither > user-code, nor beam-to-flink lifecycle code available for initialization. > As far as I could tell, it was pure flink down to the coders. Nothing new > here, but maybe it bolsters confidence in your diagnosis. I went ahead > and logged an issue here: https://issues.apache.org/jira/browse/BEAM-8303. > > Let me know what I can do to help - I'm happy to test/verify any fixes you > want to try and review any code (bearing in mind I'm a total newb in the > beam space). > > Thanks again, > Preston > > On 9/25/19, 10:34 AM, "Maximilian Michels" <m...@apache.org> wrote: > > Hi Preston, > > Sorry about the name mixup, of course I meant to write Preston not > Magnus :) See my reply below. > > cheers, > Max > > On 25.09.19 08:31, Maximilian Michels wrote: > > Hi Magnus, > > > > Your observation seems to be correct. There is an issue with the file > > system registration. > > > > The two types of errors you are seeing, as well as the successful > run, > > are just due to the different structure of the generated transforms. > The > > Flink scheduler will distribute them differently, which results in > some > > pipelines being placed on task managers which happen to execute the > > FileSystems initialization code and others not. > > > > There is a quick fix to at least initialize the file system in case > it > > has not been initialized, by adding the loading code here: > > > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&reserved=0 > > > > > > However, there we do not have the pipeline options available, which > > prevents any configuration. The problem is that the error occurs in > the > > coder used in a native Flink operation which does not even run user > code. > > > > I believe the only way fix this is to ship the FileSystems > > initialization code in CoderTypeSerializer where we are sure to > execute > > it in time for any coders which depend on it. > > > > Could you file an issue? I'd be happy to fix this then. > > > > Thanks, > > Max > > > > On 24.09.19 09:54, Chamikara Jayalath wrote: > >> As Magnus mentioned, FileSystems are picked up from the class path > and > >> registered here. > >> > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&reserved=0 > >> > >> > >> Seems like Flink is invoking this method at following locations. > >> > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&reserved=0 > >> > >> > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&reserved=0 > >> > >> > >> I'm not too familiar about Flink sure why S3 is not properly being > >> registered when running the Flink job. Ccing some folks who are more > >> familiar about Flink. > >> > >> +Ankur Goenka <mailto:goe...@google.com> +Maximilian Michels > >> <mailto:m...@apache.org> > >> > >> Thanks, > >> Cham > >> > >> > >> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake > >> <preston.b.kopriv...@cerner.com > >> <mailto:preston.b.kopriv...@cerner.com>> wrote: > >> > >> Thanks for the reply Magnus. > >> > >> I'm sorry it wasn't more clear in the original message. I have > >> added the aws dependencies and set up the pipeline options with > the > >> aws options. For the case where I set the write to ignore > >> windowing, everything works. But the option is deprecated and > the > >> comments warn against its usage. > >> > >> I'm wondering if where no options are set and I see the error > that > >> that is a case of improperly initialized filesystems in the > flink > >> runner. Or maybe someone has some different ideas for the > culprit. > >> > >> Get Outlook for Android < > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&reserved=0 > > > >> > >> > >> > ------------------------------------------------------------------------ > >> *From:* Magnus Runesson <ma...@linuxalert.org > >> <mailto:ma...@linuxalert.org>> > >> *Sent:* Saturday, September 21, 2019 9:06:03 AM > >> *To:* user@beam.apache.org <mailto:user@beam.apache.org> > >> <user@beam.apache.org <mailto:user@beam.apache.org>> > >> *Subject:* Re: No filesystem found for scheme s3 using FileIO > >> > >> Hi! > >> > >> > >> You probably miss the S3 filesystem in your classpath. > >> > >> If I remember correctly you must include this > >> > >> > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&reserved=0 > >> > >> < > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&reserved=0 > > > >> > >> package in your classpath/fat-jar. > >> > >> /Magnus > >> > >> On 2019-09-19 23:13, Koprivica,Preston Blake wrote: > >>> > >>> Hello everyone. I’m getting the following error when > attempting to > >>> use the FileIO apis (beam-2.15.0) and integrating with a 3rd > party > >>> filesystem, in this case AWS S3:____ > >>> > >>> __ __ > >>> > >>> java.lang.IllegalArgumentException: No filesystem found for > scheme > >>> s3____ > >>> > >>> at > >>> > >>> org.apache.beam.sdk.io > .FileSystems.getFileSystemInternal(FileSystems.java:456)____ > >>> > >>> > >>> at > >>> > >>> org.apache.beam.sdk.io > .FileSystems.matchNewResource(FileSystems.java:526)____ > >>> > >>> > >>> at > >>> > >>> org.apache.beam.sdk.io > .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____ > >>> > >>> > >>> at > >>> > >>> org.apache.beam.sdk.io > .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____ > >>> > >>> > >>> at > org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____ > >>> > >>> at > >>> > >>> > org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____ > >>> > >>> > >>> at > >>> > >>> org.apache.flink.runtime.io > .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____ > >>> > >>> > >>> at > >>> > >>> org.apache.flink.runtime.io > .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____ > >>> > >>> > >>> at > >>> > >>> org.apache.flink.runtime.io > .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____ > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____ > >>> > >>> > >>> at > >>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____ > >>> > >>> at java.lang.Thread.run(Thread.java:748)____ > >>> > >>> __ __ > >>> > >>> For reference, the write code resembles this:____ > >>> > >>> __ __ > >>> > >>> FileIO.Write<?, GenericRecord> write = > >>> FileIO.<GenericRecord>write()____ > >>> > >>> .via(ParquetIO.sink(schema))____ > >>> > >>> .to(options.getOutputDir()). // will be > something > >>> like: s3://<bucket>/<path>____ > >>> > >>> .withSuffix(".parquet");____ > >>> > >>> __ __ > >>> > >>> records.apply(String.format("Write(%s)", > options.getOutputDir()), > >>> write);____ > >>> > >>> __ __ > >>> > >>> I have setup the PipelineOptions with all the relevant AWS > options > >>> and the issue does not appear to be related to ParquetIO.sink() > >>> directly. I am able to reliably reproduce the issue using JSON > >>> formatted records and TextIO.sink(), as well.____ > >>> > >>> __ __ > >>> > >>> Just trying some different knobs, I went ahead and set the > >>> following option:____ > >>> > >>> __ __ > >>> > >>> write = write.withNoSpilling();____ > >>> > >>> __ __ > >>> > >>> This actually seemed to fix the issue, only to have it > reemerge as > >>> I scaled up the data set size. The stack trace, while very > >>> similar, reads:____ > >>> > >>> __ __ > >>> > >>> java.lang.IllegalArgumentException: No filesystem found for > scheme > >>> s3____ > >>> > >>> at > >>> > >>> org.apache.beam.sdk.io > .FileSystems.getFileSystemInternal(FileSystems.java:456)____ > >>> > >>> > >>> at > >>> > >>> org.apache.beam.sdk.io > .FileSystems.matchNewResource(FileSystems.java:526)____ > >>> > >>> > >>> at > >>> > >>> org.apache.beam.sdk.io > .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____ > >>> > >>> > >>> at > >>> > >>> org.apache.beam.sdk.io > .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____ > >>> > >>> > >>> at > org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____ > >>> > >>> at > >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____ > >>> > >>> at > >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____ > >>> > >>> at > >>> > >>> > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____ > >>> > >>> > >>> at > >>> > >>> org.apache.flink.runtime.io > .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____ > >>> > >>> > >>> at > >>> > >>> org.apache.flink.runtime.io > .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____ > >>> > >>> > >>> at > >>> > >>> org.apache.flink.runtime.io > .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____ > >>> > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____ > >>> > >>> at > >>> > >>> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____ > >>> > >>> > >>> at > >>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____ > >>> > >>> at java.lang.Thread.run(Thread.java:748) ____ > >>> > >>> __ __ > >>> > >>> I’ll be interested to hear some theories on the > >>> differences/similarities in the stacks. And lastly, I tried > >>> adding the following deprecated option (with and without the > >>> withNoSpilling() option):____ > >>> > >>> __ __ > >>> > >>> write = write.withIgnoreWindowing();____ > >>> > >>> __ __ > >>> > >>> This seemed to fix the issue altogether but aside from having > to > >>> rely on a deprecated feature, there is the bigger issue of > why?____ > >>> > >>> __ __ > >>> > >>> In reading through some of the source, it seems a common > pattern > >>> to have to manually register the pipeline options to seed the > >>> filesystem registry during the setup part of the operator > >>> lifecycle, e.g.: > >>> > >>> > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&reserved=0 > >>> > >>> > >>> < > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&reserved=0 > > > >>> > >>> ____ > >>> > >>> __ __ > >>> > >>> Is it possible that I have hit upon a couple scenarios where > that > >>> has not taken place? Unfortunately, I’m not yet at a position > to > >>> suggest a fix, but I’m guessing there’s some missing > >>> initialization code in one or more of the batch operators. If > >>> this is indeed a legitimate issue, I’ll be happy to log an > issue, > >>> but I’ll hold off until the community gets a chance to look at > >>> it.____ > >>> > >>> __ __ > >>> > >>> Thanks,____ > >>> > >>> * Preston ____ > >>> > >>> CONFIDENTIALITY NOTICE This message and any included > attachments > >>> are from Cerner Corporation and are intended only for the > >>> addressee. The information contained in this message is > >>> confidential and may constitute inside or non-public > information > >>> under international, federal, or state securities laws. > >>> Unauthorized forwarding, printing, copying, distribution, or > use > >>> of such information is strictly prohibited and may be > unlawful. If > >>> you are not the addressee, please promptly delete this message > and > >>> notify the sender of the delivery error by e-mail or you may > call > >>> Cerner's corporate offices in Kansas City, Missouri, U.S.A at > (+1) > >>> (816)221-1024 <tel:(816)%20221-1024>. > >>> > > > > > CONFIDENTIALITY NOTICE This message and any included attachments are from > Cerner Corporation and are intended only for the addressee. The information > contained in this message is confidential and may constitute inside or > non-public information under international, federal, or state securities > laws. Unauthorized forwarding, printing, copying, distribution, or use of > such information is strictly prohibited and may be unlawful. If you are not > the addressee, please promptly delete this message and notify the sender of > the delivery error by e-mail or you may call Cerner's corporate offices in > Kansas City, Missouri, U.S.A at (+1) (816)221-1024. >