Sorry, I had a second look and your stacktrace does not even point to the spilling channel - it reads from the memory segment directly. -> setting the temp dirs will thus not make a difference
I'm wondering why your deserializer eventually reads from a file on gs:// directly, instead of, for example, a follow-up map operation. Nico On 13/09/18 14:52, Encho Mishinev wrote: > Hi Nico, > > Unfortunately I can't share any of data, but it is not even data being > processed at the point of failure - it is still in the > matching-files-from-GCS phase. > > I am using Apache Beam's FileIO to match files and during one of those > match-files steps I get the failure above. > > Currently I run the job and when a taskmanager shows this error I reset > it and restart the job. That works fine since the failure occurs at the > beginning of the job only. It seems to be a problem within some > taskmanagers, which is very odd considering that I have them all > generated by a Kubernetes deployment, i.e. they should be completely > identical. Sometimes I have to restart 3-4 of them until I have a > running cluster. > > I will try setting the temporary directory to something other than the > default, can't hurt. > > Thanks for the help, > Encho > > On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber <n...@data-artisans.com > <mailto:n...@data-artisans.com>> wrote: > > Hi Encho, > the SpillingAdaptiveSpanningRecordDeserializer that you see in your > stack trace is executed while reading input records from another task. > If the (serialized) records are too large (> 5MiB), it will write and > assemble them in a spilling channel, i.e. on disk, instead of using > memory. This will use the temporary directories specified via > "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to > System.getProperty("java.io <http://java.io>.tmpdir"). > -> These paths must actually be on an ordinary file system, not in gs:// > or so. > > The reason you only see this sporadically may be because not all your > records are that big. It should, however, be deterministic in that it > should always occur for the same record. Maybe something is wrong here > and the record length is messed up, e.g. due to a bug in the > de/serializer or the network stack. > > Do you actually have a minimal working example that you can share > (either privately with me, or here) and shows this error? > > > Nico > > On 29/08/18 14:19, Encho Mishinev wrote: > > Hello, > > > > I am using Flink 1.5.3 and executing jobs through Apache Beam > 2.6.0. One > > of my jobs involves reading from Google Cloud Storage which uses the > > file scheme "gs://". Everything was fine but once in a while I > would get > > an exception that the scheme is not recognised. Now I've started > seeing > > them more often. It seems to be arbitrary - the exact same job > with the > > exact same parameters may finish successfully or throw this exception > > and fail immediately. I can't figure out why it's not deterministic. > > Here is the full exception logged upon the job failing: > > > > java.lang.Exception: The data preparation for task 'GroupReduce > (GroupReduce at Match files from GCS/Via > MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an > error: Error obtaining the sorted input: Thread 'SortMerger Reading > Thread' terminated due to an exception: No filesystem found for > scheme gs > > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: Thread 'SortMerger Reading Thread' terminated due to an > exception: No filesystem found for scheme gs > > at > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) > > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) > > at > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) > > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: No filesystem found for scheme gs > > at > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) > > Caused by: java.lang.IllegalArgumentException: No filesystem found > for scheme gs > > at org.apache.beam.sdk.io > > <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:459) > > at org.apache.beam.sdk.io > > <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:529) > > at > org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) > > at > org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49) > > at > org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30) > > at > > org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116) > > at > > org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88) > > at > > org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) > > at > > org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60) > > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) > > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) > > at > > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) > > at > > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) > > at > > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) > > at > > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90) > > at > > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103) > > at > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145) > > at > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > > at > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > > at org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105) > > at org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) > > at org.apache.flink.runtime.io > > <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > > at > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > at > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066) > > at > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) > > > > > > Any ideas why the behaviour is not deterministic regarding > recognising file system schemes? > > > > > > Thanks, > > > > Encho > > > > -- > Nico Kruber | Software Engineer > data Artisans > > Follow us @dataArtisans > -- > Join Flink Forward - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > -- Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
signature.asc
Description: OpenPGP digital signature