Hi all,

I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
the heap memory configuration of the jobmanager and taskmanager, I may run into 
an EOFException, which causes the job to fail. You will find the stacktrace 
near the bottom of this post (data censored).

I would not expect such a sudden failure as the dataset apparently grows above 
a certain size. Doesn’t Flink spill data over to disk when memory runs out? How 
do I deal with this unpredictable behaviour in a production situation? I’m 
running a clean Flink 1.3.2 with heap memory of 768MiB. The dataset size is in 
the tens of megabytes. The same root EOFException occurred in Flink 1.2.1. I 
will gladly provide more information where needed.

If this is expected behaviour, I feel it should be documented, meaning a more 
informative exception message, and managing user expectations in the guides. (I 
have not been able to find any information regarding this exception.)

Hoping that someone can enlighten me,

Reinier


08/30/2017 13:48:33     GroupReduce (GroupReduce at GroupByKey)(1/1) switched 
to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at 
GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:466)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: unable to serialize record 
FakeSerialisableObjectWithStringsAndDoubles{}
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
        at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:129)
        at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:48)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:76)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:60)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:33)
        at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
        at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:641)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:80)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
        at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:281)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1037)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.EOFException
        at 
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:79)
        at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:190)
        at 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:49)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:126)
        ... 19 more

Reply via email to