In another run of the job I had another Exception. Could it be helpful? Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:123) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) at org.apache.flink.types.StringValue.readString(StringValue.java:770) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109) ... 5 more
On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < stefano.bort...@huawei.com> wrote: > In fact the old problem was with the KryoSerializer missed initialization > on the exception that would trigger the spilling on disk. This would lead > to dirty serialization buffer that would eventually break the program. Till > worked on it debugging the source code generating the error. Perhaps > someone could try the same also this time. If Flavio can make the problem > reproducible in a shareable program+data. > > > > Stefano > > > > *From:* Stephan Ewen [mailto:se...@apache.org] > *Sent:* Friday, April 21, 2017 10:04 AM > *To:* user <user@flink.apache.org> > *Subject:* Re: UnilateralSortMerger error (again) > > > > In the past, these errors were most often caused by bugs in the > serializers, not in the sorter. > > > > What types are you using at that point? The Stack Trace reveals ROW and > StringValue, any other involved types? > > > > On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > > As suggested by Fabian I set taskmanager.memory.size = 1024 (to force > spilling to disk) and the job failed almost immediately.. > > > > On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > > I debugged a bit the process repeating the job on a sub-slice of the > entire data (using the id value to filter data with parquet push down > filters) and all slices completed successfully :( > > So I tried to increase the parallelism (from 1 slot per TM to 4) to see if > this was somehow a factor of stress but it didn't cause any error. > > Then I almost doubled the number of rows to process and finally the error > showed up again. > > It seems somehow related to spilling to disk but I can't really understand > what's going on :( > > This is a summary of my debug attempts: > > > > 4 Task managers with 6 GB and 1 slot each, parallelism = 4 > > > > id < 10.000.000.000 => 1.857.365 rows => OK > > id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK > > id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows => OK > > id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 rows => OK > > id >= 99.960.000.000 => 32.936.422 rows => OK > > > > 4 TM with 8 GB and 4 slot each, parallelism 16 > > > id >= 99.960.000.000 => 32.936.422 rows => OK > > id >= 99.945.000.000 => 56.825.172 rows => ERROR > > > > Any help is appreciated.. > > Best, > > Flavio > > > > On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > > I could but only if there's a good probability that it fix the > problem...how confident are you about it? > > > > On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > Looking at git log of DataInputDeserializer.java , there has been some > recent change. > > > > If you have time, maybe try with 1.2.1 RC and see if the error is > reproducible ? > > > > Cheers > > > > On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > > Hi to all, > > I think I'm again on the weird Exception with the > SpillingAdaptiveSpanningRecordDeserializer... > > I'm using Flink 1.2.0 and the error seems to rise when Flink spills to > disk but the Exception thrown is not very helpful. Any idea? > > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: null > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger. > getIterator(UnilateralSortMerger.java:619) > at org.apache.flink.runtime.operators.BatchTask.getInput( > BatchTask.java:1094) > at org.apache.flink.runtime.operators.GroupReduceDriver. > prepare(GroupReduceDriver.java:99) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: null > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:799) > Caused by: java.io.EOFException > at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte( > DataInputDeserializer.java:306) > at org.apache.flink.types.StringValue.readString(StringValue.java:747) > at org.apache.flink.api.common.typeutils.base. > StringSerializer.deserialize(StringSerializer.java:69) > at org.apache.flink.api.common.typeutils.base. > StringSerializer.deserialize(StringSerializer.java:74) > at org.apache.flink.api.common.typeutils.base. > StringSerializer.deserialize(StringSerializer.java:28) > at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de > serialize(RowSerializer.java:193) > at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de > serialize(RowSerializer.java:36) > at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read( > ReusingDeserializationDelegate.java:57) > at org.apache.flink.runtime.io.network.api.serialization. > SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( > SpillingAdaptiveSpanningRecordDeserializer.java:144) > at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader. > getNextRecord(AbstractRecordReader.java:72) > at org.apache.flink.runtime.io.network.api.reader. > MutableRecordReader.next(MutableRecordReader.java:42) > at org.apache.flink.runtime.operators.util.ReaderIterator. > next(ReaderIterator.java:59) > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ReadingThread.go(UnilateralSortMerger.java:1035) > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:796) > > > Best, > > Flavio > > > > > > > > > > >