Hi Tim,

I am using FileIO directly with the AvroIO.sink(...), however having
experienced BEAM-2277 with the SparkRunner few months ago, i got the
feeling this is something different (maybe some dependency
mismatch/missing).

Thanks

On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson <timrobertson...@gmail.com>
wrote:

> Hi Juan
>
> This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277
> which we believed fixed in 2.7.0.
> What IO are you using to write your files and can you paste a snippet of
> your code please?
>
> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
> workaround too):
>
>  transform.apply("Write",
>         AvroIO.writeGenericRecords(schema)
>             .to(FileSystems.matchNewResource(options.getTarget(),true))
>             // BEAM-2277 workaround            
> .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro", 
> true)));
>
>
> Thanks
> Tim
>
>
> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jcgarc...@gmail.com>
> wrote:
>
>> Hi Folks,
>>
>> I have a strange situation while running beam 2.7.0 with the FlinkRunner,
>> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
>> checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
>> HDFS without any problems.
>>
>> However, my pipeline has to write to HDFS as well, but fails with "Caused
>> by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
>> (stacktrace at the bottom)
>>
>> In the host where the pipeline is running:
>> 1. The environment variable HADOOP_CONF_DIR is set.
>> 2. During my pipeline construction i am explicitly calling
>> FileSystems.setDefaultPipelineOptions(_options); to trigger the
>> ServiceLoader to find all options registrar from the classpath
>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class
>> FileSystems in my main method using reflection i am able to see that at
>> launch time it contains:
>>    {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>
>> Any idea what i am doing wrong with the HDFS integration?
>>
>> {snippet}
>>
>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>>                             Field f =
>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>>                             f.setAccessible(true);
>>                             AtomicReference<Map<String, FileSystem>>
>> value = (AtomicReference<Map<String, FileSystem>>) f.get(null);
>>
>> System.out.println("===========================");
>>                             System.out.println(value);
>> {snippet}
>>
>> {stacktrace}
>> Caused by: java.lang.IllegalArgumentException: No filesystem found for
>> scheme hdfs
>>         at
>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>>         at
>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>>         at
>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>>         at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>         at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>         at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>         at
>> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>>         at
>> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>>
>> {stacktrace}
>>
>> --
>>
>> JC
>>
>>
>>
>> --
>>
>> JC
>>
>>

-- 

JC

Reply via email to