I've created a repository with a unit test to reproduce the error at
https://github.com/fpompermaier/flink-batch-bug/
blob/master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java
(probably
this error is related also to FLINK-4719).

The exception is  thrown only when there are null strings and multiple
slots per TM, I don't know whether UnilateralSorterMerger is involved or
not (but I think so..).
A quick fix for this problem would be very appreciated because it's bloking
a production deployment..

Thanks in advance to all,
Flavio

On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> After digging into the code and test I think that the problem is almost
> certainly in the UnilateralSortMerger, there should be a missing
> synchronization on some shared object somewhere...Right now I'm trying to
> understand if this section of code creates some shared object (like queues)
> that are accessed in a bad way when there's spilling to disk:
>
>                // start the thread that reads the input channels
> this.readThread = getReadingThread(exceptionHandler, input,
> circularQueues, largeRecordHandler,
> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>
> // start the thread that sorts the buffers
> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
> parentTask);
>
> // start the thread that handles spilling to secondary storage
> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
> parentTask,
> memoryManager, ioManager, serializerFactory, comparator,
> this.sortReadMemory, this.writeMemory,
> maxNumFileHandles);
> ....
> startThreads();
>
>
> The problem is that the unit tests of GroupReduceDriver use Record and
> testing Rows in not very straightforward and I'm still trying to reproduce
> the problem in a local env..
>
> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Thanks for the explanation . Is there a way to force this behaviour in a
>> local environment (to try to debug the problem)?
>>
>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> these files are used for spilling data to disk. In your case sorted runs
>>> of records.
>>> Later all (up to a fanout threshold) these sorted runs are read and
>>> merged to get a completely sorted record stream.
>>>
>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>
>>>> The error appears as soon as some taskmanager generates some
>>>> inputchannel file.
>>>> What are those files used for?
>>>>
>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> In another run of the job I had another Exception. Could it be helpful?
>>>>>
>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>> record had. This indicates broken serialization. If you are using custom
>>>>> serialization types (Value or Writable), check their serialization 
>>>>> methods.
>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>> serializer.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:465)
>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>> k.java:355)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>>> serialization. If you are using custom serialization types (Value or
>>>>> Writable), check their serialization methods. If you are using a
>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>> ask.java:1094)
>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>> (GroupReduceDriver.java:99)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:460)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>> record had. This indicates broken serialization. If you are using custom
>>>>> serialization types (Value or Writable), check their serialization 
>>>>> methods.
>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>> serializer.
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>>> the record had. This indicates broken serialization. If you are using
>>>>> custom serialization types (Value or Writable), check their serialization
>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>> Kryo serializer.
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>> Reader.next(MutableRecordReader.java:42)
>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>> ReaderIterator.java:59)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>> ySegment.java:104)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:69)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:74)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:28)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:193)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:36)
>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>> ... 5 more
>>>>>
>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>> stefano.bort...@huawei.com> wrote:
>>>>>
>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>>> the program. Till worked on it debugging the source code generating the
>>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Stefano
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org]
>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>> *To:* user <user@flink.apache.org>
>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>
>>>>>>
>>>>>>
>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>> serializers, not in the sorter.
>>>>>>
>>>>>>
>>>>>>
>>>>>> What types are you using at that point? The Stack Trace reveals ROW
>>>>>> and StringValue, any other involved types?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>>>>>> spilling to disk) and the job failed almost immediately..
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>> I debugged a bit the process repeating the job on a sub-slice of the
>>>>>> entire data (using the id value to filter data with parquet push down
>>>>>> filters) and all slices completed successfully :(
>>>>>>
>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to
>>>>>> see if this was somehow a factor of stress but it didn't cause any error.
>>>>>>
>>>>>> Then I almost doubled the number of rows to process and finally the
>>>>>> error showed up again.
>>>>>>
>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>> understand what's going on :(
>>>>>>
>>>>>> This is a summary of my debug attempts:
>>>>>>
>>>>>>
>>>>>>
>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>
>>>>>>
>>>>>>
>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>
>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>>>>>
>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows
>>>>>> => OK
>>>>>>
>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows
>>>>>> => OK
>>>>>>
>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>
>>>>>>
>>>>>>
>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>
>>>>>>
>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>
>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any help is appreciated..
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>> I could but only if there's a good probability that it fix the
>>>>>> problem...how confident are you about it?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>> some recent change.
>>>>>>
>>>>>>
>>>>>>
>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>> reproducible ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>> Hi to all,
>>>>>>
>>>>>> I think I'm again on the weird Exception with the
>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>
>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills
>>>>>> to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>> null
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>> ask.java:1094)
>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>> (GroupReduceDriver.java:99)
>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>> ava:460)
>>>>>> ... 3 more
>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>> terminated due to an exception: null
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>> Caused by: java.io.EOFException
>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>> va:747)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:69)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:74)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:28)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:193)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:36)
>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>> ReaderIterator.java:59)
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>
>

Reply via email to