Hello, I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One of my jobs involves reading from Google Cloud Storage which uses the file scheme "gs://". Everything was fine but once in a while I would get an exception that the scheme is not recognised. Now I've started seeing them more often. It seems to be arbitrary - the exact same job with the exact same parameters may finish successfully or throw this exception and fail immediately. I can't figure out why it's not deterministic. Here is the full exception logged upon the job failing:
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at Match files from GCS/Via MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529) at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30) at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116) at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88) at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) Any ideas why the behaviour is not deterministic regarding recognising file system schemes? Thanks, Encho