The error appears as soon as some taskmanager generates some inputchannel
file.
What are those files used for?

On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> In another run of the job I had another Exception. Could it be helpful?
>
> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception:
> Serializer consumed more bytes than the record had. This indicates broken
> serialization. If you are using custom serialization types (Value or
> Writable), check their serialization methods. If you are using a
> Kryo-serialized type, check the corresponding Kryo serializer.
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.IOException: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:123)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
> at org.apache.flink.core.memory.HeapMemorySegment.get(
> HeapMemorySegment.java:104)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(
> SpillingAdaptiveSpanningRecordDeserializer.java:226)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
> readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:69)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:74)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:28)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(
> RowSerializer.java:193)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(
> RowSerializer.java:36)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:109)
> ... 5 more
>
> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
> stefano.bort...@huawei.com> wrote:
>
>> In fact the old problem was with the KryoSerializer missed initialization
>> on the exception that would trigger the spilling on disk. This would lead
>> to dirty serialization buffer that would eventually break the program. Till
>> worked on it debugging the source code generating the error. Perhaps
>> someone could try the same also this time. If Flavio can make the problem
>> reproducible in a shareable program+data.
>>
>>
>>
>> Stefano
>>
>>
>>
>> *From:* Stephan Ewen [mailto:se...@apache.org]
>> *Sent:* Friday, April 21, 2017 10:04 AM
>> *To:* user <user@flink.apache.org>
>> *Subject:* Re: UnilateralSortMerger error (again)
>>
>>
>>
>> In the past, these errors were most often caused by bugs in the
>> serializers, not in the sorter.
>>
>>
>>
>> What types are you using at that point? The Stack Trace reveals ROW and
>> StringValue, any other involved types?
>>
>>
>>
>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>> spilling to disk) and the job failed almost immediately..
>>
>>
>>
>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>> I debugged a bit the process repeating the job on a sub-slice of the
>> entire data (using the id value to filter data with parquet push down
>> filters) and all slices completed successfully :(
>>
>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>> if this was somehow a factor of stress but it didn't cause any error.
>>
>> Then I almost doubled the number of rows to process and finally the error
>> showed up again.
>>
>> It seems somehow related to spilling to disk but I can't really
>> understand what's going on :(
>>
>> This is a summary of my debug attempts:
>>
>>
>>
>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>
>>
>>
>> id < 10.000.000.000  => 1.857.365 rows => OK
>>
>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>
>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows => OK
>>
>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows =>
>> OK
>>
>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>
>>
>>
>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>
>>
>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>
>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>
>>
>>
>> Any help is appreciated..
>>
>> Best,
>>
>> Flavio
>>
>>
>>
>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>> I could but only if there's a good probability that it fix the
>> problem...how confident are you about it?
>>
>>
>>
>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>> Looking at git log of DataInputDeserializer.java , there has been some
>> recent change.
>>
>>
>>
>> If you have time, maybe try with 1.2.1 RC and see if the error is
>> reproducible ?
>>
>>
>>
>> Cheers
>>
>>
>>
>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>> Hi to all,
>>
>> I think I'm again on the weird Exception with the
>> SpillingAdaptiveSpanningRecordDeserializer...
>>
>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>> disk but the Exception thrown is not very helpful. Any idea?
>>
>>
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1094)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:99)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.EOFException
>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>> gnedByte(DataInputDeserializer.java:306)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:69)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:74)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:28)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>> serialize(RowSerializer.java:193)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>> serialize(RowSerializer.java:36)
>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>> gate.read(ReusingDeserializationDelegate.java:57)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:144)
>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:72)
>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:42)
>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>> Best,
>>
>> Flavio
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
>

Reply via email to