The error appears as soon as some taskmanager generates some inputchannel file. What are those files used for?
On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > In another run of the job I had another Exception. Could it be helpful? > > Error obtaining the sorted input: Thread 'SortMerger Reading Thread' > terminated due to an exception: Serializer consumed more bytes than the > record had. This indicates broken serialization. If you are using custom > serialization types (Value or Writable), check their serialization methods. > If you are using a Kryo-serialized type, check the corresponding Kryo > serializer. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: > Serializer consumed more bytes than the record had. This indicates broken > serialization. If you are using custom serialization types (Value or > Writable), check their serialization methods. If you are using a > Kryo-serialized type, check the corresponding Kryo serializer. > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger. > getIterator(UnilateralSortMerger.java:619) > at org.apache.flink.runtime.operators.BatchTask.getInput( > BatchTask.java:1094) > at org.apache.flink.runtime.operators.GroupReduceDriver. > prepare(GroupReduceDriver.java:99) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: Serializer consumed more bytes than the > record had. This indicates broken serialization. If you are using custom > serialization types (Value or Writable), check their serialization methods. > If you are using a Kryo-serialized type, check the corresponding Kryo > serializer. > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:799) > Caused by: java.io.IOException: Serializer consumed more bytes than the > record had. This indicates broken serialization. If you are using custom > serialization types (Value or Writable), check their serialization methods. > If you are using a Kryo-serialized type, check the corresponding Kryo > serializer. > at org.apache.flink.runtime.io.network.api.serialization. > SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( > SpillingAdaptiveSpanningRecordDeserializer.java:123) > at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader. > getNextRecord(AbstractRecordReader.java:72) > at org.apache.flink.runtime.io.network.api.reader. > MutableRecordReader.next(MutableRecordReader.java:42) > at org.apache.flink.runtime.operators.util.ReaderIterator. > next(ReaderIterator.java:59) > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ReadingThread.go(UnilateralSortMerger.java:1035) > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:796) > Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 > at org.apache.flink.core.memory.HeapMemorySegment.get( > HeapMemorySegment.java:104) > at org.apache.flink.runtime.io.network.api.serialization. > SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte( > SpillingAdaptiveSpanningRecordDeserializer.java:226) > at org.apache.flink.runtime.io.network.api.serialization. > SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper. > readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) > at org.apache.flink.types.StringValue.readString(StringValue.java:770) > at org.apache.flink.api.common.typeutils.base. > StringSerializer.deserialize(StringSerializer.java:69) > at org.apache.flink.api.common.typeutils.base. > StringSerializer.deserialize(StringSerializer.java:74) > at org.apache.flink.api.common.typeutils.base. > StringSerializer.deserialize(StringSerializer.java:28) > at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize( > RowSerializer.java:193) > at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize( > RowSerializer.java:36) > at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read( > ReusingDeserializationDelegate.java:57) > at org.apache.flink.runtime.io.network.api.serialization. > SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( > SpillingAdaptiveSpanningRecordDeserializer.java:109) > ... 5 more > > On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < > stefano.bort...@huawei.com> wrote: > >> In fact the old problem was with the KryoSerializer missed initialization >> on the exception that would trigger the spilling on disk. This would lead >> to dirty serialization buffer that would eventually break the program. Till >> worked on it debugging the source code generating the error. Perhaps >> someone could try the same also this time. If Flavio can make the problem >> reproducible in a shareable program+data. >> >> >> >> Stefano >> >> >> >> *From:* Stephan Ewen [mailto:se...@apache.org] >> *Sent:* Friday, April 21, 2017 10:04 AM >> *To:* user <user@flink.apache.org> >> *Subject:* Re: UnilateralSortMerger error (again) >> >> >> >> In the past, these errors were most often caused by bugs in the >> serializers, not in the sorter. >> >> >> >> What types are you using at that point? The Stack Trace reveals ROW and >> StringValue, any other involved types? >> >> >> >> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force >> spilling to disk) and the job failed almost immediately.. >> >> >> >> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >> I debugged a bit the process repeating the job on a sub-slice of the >> entire data (using the id value to filter data with parquet push down >> filters) and all slices completed successfully :( >> >> So I tried to increase the parallelism (from 1 slot per TM to 4) to see >> if this was somehow a factor of stress but it didn't cause any error. >> >> Then I almost doubled the number of rows to process and finally the error >> showed up again. >> >> It seems somehow related to spilling to disk but I can't really >> understand what's going on :( >> >> This is a summary of my debug attempts: >> >> >> >> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >> >> >> >> id < 10.000.000.000 => 1.857.365 rows => OK >> >> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK >> >> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows => OK >> >> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 rows => >> OK >> >> id >= 99.960.000.000 => 32.936.422 rows => OK >> >> >> >> 4 TM with 8 GB and 4 slot each, parallelism 16 >> >> >> id >= 99.960.000.000 => 32.936.422 rows => OK >> >> id >= 99.945.000.000 => 56.825.172 rows => ERROR >> >> >> >> Any help is appreciated.. >> >> Best, >> >> Flavio >> >> >> >> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >> I could but only if there's a good probability that it fix the >> problem...how confident are you about it? >> >> >> >> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >> Looking at git log of DataInputDeserializer.java , there has been some >> recent change. >> >> >> >> If you have time, maybe try with 1.2.1 RC and see if the error is >> reproducible ? >> >> >> >> Cheers >> >> >> >> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >> Hi to all, >> >> I think I'm again on the weird Exception with the >> SpillingAdaptiveSpanningRecordDeserializer... >> >> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to >> disk but the Exception thrown is not very helpful. Any idea? >> >> >> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger Reading Thread' terminated due to an exception: null >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> .getIterator(UnilateralSortMerger.java:619) >> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >> ask.java:1094) >> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >> (GroupReduceDriver.java:99) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >> terminated due to an exception: null >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $ThreadBase.run(UnilateralSortMerger.java:799) >> Caused by: java.io.EOFException >> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >> gnedByte(DataInputDeserializer.java:306) >> at org.apache.flink.types.StringValue.readString(StringValue.java:747) >> at org.apache.flink.api.common.typeutils.base.StringSerializer. >> deserialize(StringSerializer.java:69) >> at org.apache.flink.api.common.typeutils.base.StringSerializer. >> deserialize(StringSerializer.java:74) >> at org.apache.flink.api.common.typeutils.base.StringSerializer. >> deserialize(StringSerializer.java:28) >> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >> serialize(RowSerializer.java:193) >> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >> serialize(RowSerializer.java:36) >> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >> gate.read(ReusingDeserializationDelegate.java:57) >> at org.apache.flink.runtime.io.network.api.serialization.Spilli >> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >> daptiveSpanningRecordDeserializer.java:144) >> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >> dReader.getNextRecord(AbstractRecordReader.java:72) >> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >> Reader.next(MutableRecordReader.java:42) >> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >> ReaderIterator.java:59) >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $ReadingThread.go(UnilateralSortMerger.java:1035) >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $ThreadBase.run(UnilateralSortMerger.java:796) >> >> >> Best, >> >> Flavio >> >> >> >> >> >> >> >> >> >> >> > > >