Hello,

I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One of
my jobs involves reading from Google Cloud Storage which uses the file
scheme "gs://". Everything was fine but once in a while I would get an
exception that the scheme is not recognised. Now I've started seeing them
more often. It seems to be arbitrary - the exact same job with the exact
same parameters may finish successfully or throw this exception and fail
immediately. I can't figure out why it's not deterministic. Here is the
full exception logged upon the job failing:

java.lang.Exception: The data preparation for task 'GroupReduce
(GroupReduce at Match files from GCS/Via
MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an
error: Error obtaining the sorted input: Thread 'SortMerger Reading
Thread' terminated due to an exception: No filesystem found for scheme
gs
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted
input: Thread 'SortMerger Reading Thread' terminated due to an
exception: No filesystem found for scheme gs
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: No filesystem found for scheme gs
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs
        at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
        at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529)
        at 
org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
        at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
        at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
        at 
org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
        at 
org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
        at 
org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
        at 
org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
        at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
        at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)


Any ideas why the behaviour is not deterministic regarding recognising
file system schemes?


Thanks,

Encho

Reply via email to