Sorry, I had a second look and your stacktrace does not even point to
the spilling channel - it reads from the memory segment directly.
-> setting the temp dirs will thus not make a difference

I'm wondering why your deserializer eventually reads from a file on
gs:// directly, instead of, for example, a follow-up map operation.

Nico

On 13/09/18 14:52, Encho Mishinev wrote:
> Hi Nico,
> 
> Unfortunately I can't share any of data, but it is not even data being
> processed at the point of failure - it is still in the
> matching-files-from-GCS phase.
> 
> I am using Apache Beam's FileIO to match files and during one of those
> match-files steps I get the failure above.
> 
> Currently I run the job and when a taskmanager shows this error I reset
> it and restart the job. That works fine since the failure occurs at the
> beginning of the job only. It seems to be a problem within some
> taskmanagers, which is very odd considering that I have them all
> generated by a Kubernetes deployment, i.e. they should be completely
> identical. Sometimes I have to restart 3-4 of them until I have a
> running cluster.
> 
> I will try setting the temporary directory to something other than the
> default, can't hurt.
> 
> Thanks for the help,
> Encho
> 
> On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber <n...@data-artisans.com
> <mailto:n...@data-artisans.com>> wrote:
> 
>     Hi Encho,
>     the SpillingAdaptiveSpanningRecordDeserializer that you see in your
>     stack trace is executed while reading input records from another task.
>     If the (serialized) records are too large (> 5MiB), it will write and
>     assemble them in a spilling channel, i.e. on disk, instead of using
>     memory. This will use the temporary directories specified via
>     "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to
>     System.getProperty("java.io <http://java.io>.tmpdir").
>     -> These paths must actually be on an ordinary file system, not in gs://
>     or so.
> 
>     The reason you only see this sporadically may be because not all your
>     records are that big. It should, however, be deterministic in that it
>     should always occur for the same record. Maybe something is wrong here
>     and the record length is messed up, e.g. due to a bug in the
>     de/serializer or the network stack.
> 
>     Do you actually have a minimal working example that you can share
>     (either privately with me, or here) and shows this error?
> 
> 
>     Nico
> 
>     On 29/08/18 14:19, Encho Mishinev wrote:
>     > Hello,
>     >
>     > I am using Flink 1.5.3 and executing jobs through Apache Beam
>     2.6.0. One
>     > of my jobs involves reading from Google Cloud Storage which uses the
>     > file scheme "gs://". Everything was fine but once in a while I
>     would get
>     > an exception that the scheme is not recognised. Now I've started
>     seeing
>     > them more often. It seems to be arbitrary - the exact same job
>     with the
>     > exact same parameters may finish successfully or throw this exception
>     > and fail immediately. I can't figure out why it's not deterministic.
>     > Here is the full exception logged upon the job failing:
>     >
>     > java.lang.Exception: The data preparation for task 'GroupReduce
>     (GroupReduce at Match files from GCS/Via
>     MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an
>     error: Error obtaining the sorted input: Thread 'SortMerger Reading
>     Thread' terminated due to an exception: No filesystem found for
>     scheme gs
>     >       at
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
>     >       at
>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>     >       at java.lang.Thread.run(Thread.java:748)
>     > Caused by: java.lang.RuntimeException: Error obtaining the sorted
>     input: Thread 'SortMerger Reading Thread' terminated due to an
>     exception: No filesystem found for scheme gs
>     >       at
>     
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
>     >       at
>     org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
>     >       at
>     
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
>     >       at
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
>     >       ... 3 more
>     > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>     terminated due to an exception: No filesystem found for scheme gs
>     >       at
>     
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
>     > Caused by: java.lang.IllegalArgumentException: No filesystem found
>     for scheme gs
>     >       at org.apache.beam.sdk.io
>     
> <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:459)
>     >       at org.apache.beam.sdk.io
>     
> <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:529)
>     >       at
>     org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
>     >       at
>     org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
>     >       at
>     org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
>     >       at
>     
> org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
>     >       at
>     
> org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
>     >       at
>     
> org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
>     >       at
>     
> org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
>     >       at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     >       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     >       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     >       at
>     
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     >       at
>     
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     >       at
>     
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     >       at
>     
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
>     >       at
>     
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
>     >       at
>     
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
>     >       at
>     
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     >       at
>     
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>     >       at org.apache.flink.runtime.io
>     
> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
>     >       at org.apache.flink.runtime.io
>     
> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     >       at org.apache.flink.runtime.io
>     
> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     >       at
>     
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>     >       at
>     
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
>     >       at
>     
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
>     >
>     >
>     > Any ideas why the behaviour is not deterministic regarding
>     recognising file system schemes?
>     >
>     >
>     > Thanks,
>     >
>     > Encho
>     >
> 
>     -- 
>     Nico Kruber | Software Engineer
>     data Artisans
> 
>     Follow us @dataArtisans
>     --
>     Join Flink Forward - The Apache Flink Conference
>     Stream Processing | Event Driven | Real Time
>     --
>     Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>     data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>     --
>     Data Artisans GmbH
>     Registered at Amtsgericht Charlottenburg: HRB 158244 B
>     Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to