Hi Flavio, these files are used for spilling data to disk. In your case sorted runs of records. Later all (up to a fanout threshold) these sorted runs are read and merged to get a completely sorted record stream.
2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > The error appears as soon as some taskmanager generates some inputchannel > file. > What are those files used for? > > On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <pomperma...@okkam.it > > wrote: > >> In another run of the job I had another Exception. Could it be helpful? >> >> Error obtaining the sorted input: Thread 'SortMerger Reading Thread' >> terminated due to an exception: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >> k.java:355) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger Reading Thread' terminated due to an exception: >> Serializer consumed more bytes than the record had. This indicates broken >> serialization. If you are using custom serialization types (Value or >> Writable), check their serialization methods. If you are using a >> Kryo-serialized type, check the corresponding Kryo serializer. >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> .getIterator(UnilateralSortMerger.java:619) >> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >> ask.java:1094) >> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >> (GroupReduceDriver.java:99) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >> terminated due to an exception: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $ThreadBase.run(UnilateralSortMerger.java:799) >> Caused by: java.io.IOException: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at org.apache.flink.runtime.io.network.api.serialization.Spilli >> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >> daptiveSpanningRecordDeserializer.java:123) >> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >> dReader.getNextRecord(AbstractRecordReader.java:72) >> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >> Reader.next(MutableRecordReader.java:42) >> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >> ReaderIterator.java:59) >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $ReadingThread.go(UnilateralSortMerger.java:1035) >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $ThreadBase.run(UnilateralSortMerger.java:796) >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor >> ySegment.java:104) >> at org.apache.flink.runtime.io.network.api.serialization.Spilli >> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper. >> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226) >> at org.apache.flink.runtime.io.network.api.serialization.Spilli >> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) >> at org.apache.flink.types.StringValue.readString(StringValue.java:770) >> at org.apache.flink.api.common.typeutils.base.StringSerializer. >> deserialize(StringSerializer.java:69) >> at org.apache.flink.api.common.typeutils.base.StringSerializer. >> deserialize(StringSerializer.java:74) >> at org.apache.flink.api.common.typeutils.base.StringSerializer. >> deserialize(StringSerializer.java:28) >> at org.apache.flink.api.java.typeutils.runtime.RowSerializer. >> deserialize(RowSerializer.java:193) >> at org.apache.flink.api.java.typeutils.runtime.RowSerializer. >> deserialize(RowSerializer.java:36) >> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >> gate.read(ReusingDeserializationDelegate.java:57) >> at org.apache.flink.runtime.io.network.api.serialization.Spilli >> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >> daptiveSpanningRecordDeserializer.java:109) >> ... 5 more >> >> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >> stefano.bort...@huawei.com> wrote: >> >>> In fact the old problem was with the KryoSerializer missed >>> initialization on the exception that would trigger the spilling on disk. >>> This would lead to dirty serialization buffer that would eventually break >>> the program. Till worked on it debugging the source code generating the >>> error. Perhaps someone could try the same also this time. If Flavio can >>> make the problem reproducible in a shareable program+data. >>> >>> >>> >>> Stefano >>> >>> >>> >>> *From:* Stephan Ewen [mailto:se...@apache.org] >>> *Sent:* Friday, April 21, 2017 10:04 AM >>> *To:* user <user@flink.apache.org> >>> *Subject:* Re: UnilateralSortMerger error (again) >>> >>> >>> >>> In the past, these errors were most often caused by bugs in the >>> serializers, not in the sorter. >>> >>> >>> >>> What types are you using at that point? The Stack Trace reveals ROW and >>> StringValue, any other involved types? >>> >>> >>> >>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force >>> spilling to disk) and the job failed almost immediately.. >>> >>> >>> >>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>> I debugged a bit the process repeating the job on a sub-slice of the >>> entire data (using the id value to filter data with parquet push down >>> filters) and all slices completed successfully :( >>> >>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see >>> if this was somehow a factor of stress but it didn't cause any error. >>> >>> Then I almost doubled the number of rows to process and finally the >>> error showed up again. >>> >>> It seems somehow related to spilling to disk but I can't really >>> understand what's going on :( >>> >>> This is a summary of my debug attempts: >>> >>> >>> >>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>> >>> >>> >>> id < 10.000.000.000 => 1.857.365 rows => OK >>> >>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK >>> >>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows => >>> OK >>> >>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 rows => >>> OK >>> >>> id >= 99.960.000.000 => 32.936.422 rows => OK >>> >>> >>> >>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>> >>> >>> id >= 99.960.000.000 => 32.936.422 rows => OK >>> >>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>> >>> >>> >>> Any help is appreciated.. >>> >>> Best, >>> >>> Flavio >>> >>> >>> >>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>> I could but only if there's a good probability that it fix the >>> problem...how confident are you about it? >>> >>> >>> >>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>> Looking at git log of DataInputDeserializer.java , there has been some >>> recent change. >>> >>> >>> >>> If you have time, maybe try with 1.2.1 RC and see if the error is >>> reproducible ? >>> >>> >>> >>> Cheers >>> >>> >>> >>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>> Hi to all, >>> >>> I think I'm again on the weird Exception with the >>> SpillingAdaptiveSpanningRecordDeserializer... >>> >>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to >>> disk but the Exception thrown is not very helpful. Any idea? >>> >>> >>> >>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >>> Thread 'SortMerger Reading Thread' terminated due to an exception: null >>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>> .getIterator(UnilateralSortMerger.java:619) >>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>> ask.java:1094) >>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>> (GroupReduceDriver.java:99) >>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >>> ... 3 more >>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>> terminated due to an exception: null >>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>> $ThreadBase.run(UnilateralSortMerger.java:799) >>> Caused by: java.io.EOFException >>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>> gnedByte(DataInputDeserializer.java:306) >>> at org.apache.flink.types.StringValue.readString(StringValue.java:747) >>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>> deserialize(StringSerializer.java:69) >>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>> deserialize(StringSerializer.java:74) >>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>> deserialize(StringSerializer.java:28) >>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>> serialize(RowSerializer.java:193) >>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>> serialize(RowSerializer.java:36) >>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>> gate.read(ReusingDeserializationDelegate.java:57) >>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>> daptiveSpanningRecordDeserializer.java:144) >>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>> dReader.getNextRecord(AbstractRecordReader.java:72) >>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>> Reader.next(MutableRecordReader.java:42) >>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>> ReaderIterator.java:59) >>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>> $ThreadBase.run(UnilateralSortMerger.java:796) >>> >>> >>> Best, >>> >>> Flavio >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >> >> >> >