After digging into the code and test I think that the problem is almost
certainly in the UnilateralSortMerger, there should be a missing
synchronization on some shared object somewhere...Right now I'm trying to
understand if this section of code creates some shared object (like queues)
that are accessed in a bad way when there's spilling to disk:

               // start the thread that reads the input channels
this.readThread = getReadingThread(exceptionHandler, input, circularQueues,
largeRecordHandler,
parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));

// start the thread that sorts the buffers
this.sortThread = getSortingThread(exceptionHandler, circularQueues,
parentTask);

// start the thread that handles spilling to secondary storage
this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
parentTask,
memoryManager, ioManager, serializerFactory, comparator,
this.sortReadMemory, this.writeMemory,
maxNumFileHandles);
....
startThreads();


The problem is that the unit tests of GroupReduceDriver use Record and
testing Rows in not very straightforward and I'm still trying to reproduce
the problem in a local env..

On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Thanks for the explanation . Is there a way to force this behaviour in a
> local environment (to try to debug the problem)?
>
> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> these files are used for spilling data to disk. In your case sorted runs
>> of records.
>> Later all (up to a fanout threshold) these sorted runs are read and
>> merged to get a completely sorted record stream.
>>
>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> The error appears as soon as some taskmanager generates some
>>> inputchannel file.
>>> What are those files used for?
>>>
>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> In another run of the job I had another Exception. Could it be helpful?
>>>>
>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>> record had. This indicates broken serialization. If you are using custom
>>>> serialization types (Value or Writable), check their serialization methods.
>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>> serializer.
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:355)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>> serialization. If you are using custom serialization types (Value or
>>>> Writable), check their serialization methods. If you are using a
>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1094)
>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>> (GroupReduceDriver.java:99)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>> ... 3 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>> record had. This indicates broken serialization. If you are using custom
>>>> serialization types (Value or Writable), check their serialization methods.
>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>> serializer.
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>>>> record had. This indicates broken serialization. If you are using custom
>>>> serialization types (Value or Writable), check their serialization methods.
>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>> serializer.
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>> daptiveSpanningRecordDeserializer.java:123)
>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:42)
>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>> ySegment.java:104)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:69)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:74)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:28)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:193)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:36)
>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>> daptiveSpanningRecordDeserializer.java:109)
>>>> ... 5 more
>>>>
>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>> stefano.bort...@huawei.com> wrote:
>>>>
>>>>> In fact the old problem was with the KryoSerializer missed
>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>> the program. Till worked on it debugging the source code generating the
>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>> make the problem reproducible in a shareable program+data.
>>>>>
>>>>>
>>>>>
>>>>> Stefano
>>>>>
>>>>>
>>>>>
>>>>> *From:* Stephan Ewen [mailto:se...@apache.org]
>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>> *To:* user <user@flink.apache.org>
>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>
>>>>>
>>>>>
>>>>> In the past, these errors were most often caused by bugs in the
>>>>> serializers, not in the sorter.
>>>>>
>>>>>
>>>>>
>>>>> What types are you using at that point? The Stack Trace reveals ROW
>>>>> and StringValue, any other involved types?
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>>>>> spilling to disk) and the job failed almost immediately..
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>> I debugged a bit the process repeating the job on a sub-slice of the
>>>>> entire data (using the id value to filter data with parquet push down
>>>>> filters) and all slices completed successfully :(
>>>>>
>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to
>>>>> see if this was somehow a factor of stress but it didn't cause any error.
>>>>>
>>>>> Then I almost doubled the number of rows to process and finally the
>>>>> error showed up again.
>>>>>
>>>>> It seems somehow related to spilling to disk but I can't really
>>>>> understand what's going on :(
>>>>>
>>>>> This is a summary of my debug attempts:
>>>>>
>>>>>
>>>>>
>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>
>>>>>
>>>>>
>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>
>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>>>>
>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows
>>>>> => OK
>>>>>
>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows
>>>>> => OK
>>>>>
>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>
>>>>>
>>>>>
>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>
>>>>>
>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>
>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>
>>>>>
>>>>>
>>>>> Any help is appreciated..
>>>>>
>>>>> Best,
>>>>>
>>>>> Flavio
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>> I could but only if there's a good probability that it fix the
>>>>> problem...how confident are you about it?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>> Looking at git log of DataInputDeserializer.java , there has been some
>>>>> recent change.
>>>>>
>>>>>
>>>>>
>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>> reproducible ?
>>>>>
>>>>>
>>>>>
>>>>> Cheers
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>> Hi to all,
>>>>>
>>>>> I think I'm again on the weird Exception with the
>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>
>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>>>> disk but the Exception thrown is not very helpful. Any idea?
>>>>>
>>>>>
>>>>>
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>> null
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>> ask.java:1094)
>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>> (GroupReduceDriver.java:99)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:460)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: null
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException
>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:69)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:74)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:28)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:193)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:36)
>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>> Reader.next(MutableRecordReader.java:42)
>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>> ReaderIterator.java:59)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>>
>>>>> Best,
>>>>>
>>>>> Flavio
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>
>>

Reply via email to