Hi Tim, I am using FileIO directly with the AvroIO.sink(...), however having experienced BEAM-2277 with the SparkRunner few months ago, i got the feeling this is something different (maybe some dependency mismatch/missing).
Thanks On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson <timrobertson...@gmail.com> wrote: > Hi Juan > > This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277 > which we believed fixed in 2.7.0. > What IO are you using to write your files and can you paste a snippet of > your code please? > > On BEAM-2277 I posted a workaround for AvroIO (it might help you find a > workaround too): > > transform.apply("Write", > AvroIO.writeGenericRecords(schema) > .to(FileSystems.matchNewResource(options.getTarget(),true)) > // BEAM-2277 workaround > .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro", > true))); > > > Thanks > Tim > > > On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jcgarc...@gmail.com> > wrote: > >> Hi Folks, >> >> I have a strange situation while running beam 2.7.0 with the FlinkRunner, >> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its >> checkpoint. Flink is able to correctly writes its checkpoint / savepoint to >> HDFS without any problems. >> >> However, my pipeline has to write to HDFS as well, but fails with "Caused >> by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs" >> (stacktrace at the bottom) >> >> In the host where the pipeline is running: >> 1. The environment variable HADOOP_CONF_DIR is set. >> 2. During my pipeline construction i am explicitly calling >> FileSystems.setDefaultPipelineOptions(_options); to trigger the >> ServiceLoader to find all options registrar from the classpath >> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class >> FileSystems in my main method using reflection i am able to see that at >> launch time it contains: >> {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff, >> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8} >> >> Any idea what i am doing wrong with the HDFS integration? >> >> {snippet} >> >> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions()); >> Field f = >> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM"); >> f.setAccessible(true); >> AtomicReference<Map<String, FileSystem>> >> value = (AtomicReference<Map<String, FileSystem>>) f.get(null); >> >> System.out.println("==========================="); >> System.out.println(value); >> {snippet} >> >> {stacktrace} >> Caused by: java.lang.IllegalArgumentException: No filesystem found for >> scheme hdfs >> at >> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) >> at >> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) >> at >> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293) >> at >> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131) >> at >> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131) >> at >> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131) >> at >> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920) >> at >> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715) >> >> {stacktrace} >> >> -- >> >> JC >> >> >> >> -- >> >> JC >> >> -- JC