Great!! Thanks a lot Kurt

On Thu, Apr 27, 2017 at 5:31 PM, Kurt Young <> wrote:

> Hi, i have found the bug:,
> will open a PR soon.
> Best,
> Kurt
> On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <>
> wrote:
>> Thanks a lot Kurt!
>> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <> wrote:
>>> Thanks for the test case, i will take a look at it.
>>> Flavio Pompermaier <>于2017年4月27日 周四03:55写道:
>>>> I've created a repository with a unit test to reproduce the error at
>>>> er/src/test/java/it/okkam/flink/aci/ 
>>>> (probably
>>>> this error is related also to FLINK-4719).
>>>> The exception is  thrown only when there are null strings and multiple
>>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>>>> not (but I think so..).
>>>> A quick fix for this problem would be very appreciated because it's
>>>> bloking a production deployment..
>>>> Thanks in advance to all,
>>>> Flavio
>>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <
>>>>> wrote:
>>>>> After digging into the code and test I think that the problem is
>>>>> almost certainly in the UnilateralSortMerger, there should be a missing
>>>>> synchronization on some shared object somewhere...Right now I'm trying to
>>>>> understand if this section of code creates some shared object (like 
>>>>> queues)
>>>>> that are accessed in a bad way when there's spilling to disk:
>>>>>                // start the thread that reads the input channels
>>>>> this.readThread = getReadingThread(exceptionHandler, input,
>>>>> circularQueues, largeRecordHandler,
>>>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>>>> // start the thread that sorts the buffers
>>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>>>> parentTask);
>>>>> // start the thread that handles spilling to secondary storage
>>>>> this.spillThread = getSpillingThread(exceptionHandler,
>>>>> circularQueues, parentTask,
>>>>> memoryManager, ioManager, serializerFactory, comparator,
>>>>> this.sortReadMemory, this.writeMemory,
>>>>> maxNumFileHandles);
>>>>> ....
>>>>> startThreads();
>>>>> The problem is that the unit tests of GroupReduceDriver use Record and
>>>>> testing Rows in not very straightforward and I'm still trying to reproduce
>>>>> the problem in a local env..
>>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>>>>> wrote:
>>>>>> Thanks for the explanation . Is there a way to force this behaviour
>>>>>> in a local environment (to try to debug the problem)?
>>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <> wrote:
>>>>>>> Hi Flavio,
>>>>>>> these files are used for spilling data to disk. In your case sorted
>>>>>>> runs of records.
>>>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>>>> merged to get a completely sorted record stream.
>>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <>
>>>>>>> :
>>>>>>>> The error appears as soon as some taskmanager generates some
>>>>>>>> inputchannel file.
>>>>>>>> What are those files used for?
>>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>>>>> wrote:
>>>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>>>> helpful?
>>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more 
>>>>>>>>> bytes than
>>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>>> serialization
>>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>>> corresponding
>>>>>>>>> Kryo serializer.
>>>>>>>>> at
>>>>>>>>> ava:465)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>>>> at
>>>>>>>>> at
>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
>>>>>>>>> exception:
>>>>>>>>> Serializer consumed more bytes than the record had. This indicates 
>>>>>>>>> broken
>>>>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>>>>> Writable), check their serialization methods. If you are using a
>>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> .getIterator(
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>> (
>>>>>>>>> at
>>>>>>>>> ava:460)
>>>>>>>>> ... 3 more
>>>>>>>>> Caused by: Thread 'SortMerger Reading Thread'
>>>>>>>>> terminated due to an exception: Serializer consumed more bytes than 
>>>>>>>>> the
>>>>>>>>> record had. This indicates broken serialization. If you are using 
>>>>>>>>> custom
>>>>>>>>> serialization types (Value or Writable), check their serialization 
>>>>>>>>> methods.
>>>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>>>>> serializer.
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $
>>>>>>>>> Caused by: Serializer consumed more bytes
>>>>>>>>> than the record had. This indicates broken serialization. If you are 
>>>>>>>>> using
>>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>>> serialization
>>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>>> corresponding
>>>>>>>>> Kryo serializer.
>>>>>>>>> at
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> at
>>>>>>>>> dReader.getNextRecord(
>>>>>>>>> at
>>>>>>>>> at
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ReadingThread.go(
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $
>>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>>>>>> at
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$
>>>>>>>>> Byte(
>>>>>>>>> at
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$
>>>>>>>>> UnsignedByte(
>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>> va:770)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(
>>>>>>>>> at
>>>>>>>>> serialize(
>>>>>>>>> at
>>>>>>>>> serialize(
>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>> at
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> ... 5 more
>>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>>>>> wrote:
>>>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>>>> initialization on the exception that would trigger the spilling on 
>>>>>>>>>> disk.
>>>>>>>>>> This would lead to dirty serialization buffer that would eventually 
>>>>>>>>>> break
>>>>>>>>>> the program. Till worked on it debugging the source code generating 
>>>>>>>>>> the
>>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio 
>>>>>>>>>> can
>>>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>>> Stefano
>>>>>>>>>> *From:* Stephan Ewen []
>>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>>>> *To:* user <>
>>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>>>>> serializers, not in the sorter.
>>>>>>>>>> What types are you using at that point? The Stack Trace reveals
>>>>>>>>>> ROW and StringValue, any other involved types?
>>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>>>>> wrote:
>>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>>>>> wrote:
>>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of
>>>>>>>>>> the entire data (using the id value to filter data with parquet push 
>>>>>>>>>> down
>>>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4)
>>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause 
>>>>>>>>>> any error.
>>>>>>>>>> Then I almost doubled the number of rows to process and finally
>>>>>>>>>> the error showed up again.
>>>>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>>>>> understand what's going on :(
>>>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>>>> id <  => 1.857.365 rows => OK
>>>>>>>>>> id >= && id < => 20.057.714 rows
>>>>>>>>>> => OK
>>>>>>>>>> id >= && id < 99.945.000.000       => 20.926.903
>>>>>>>>>> rows => OK
>>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>>>>  rows => OK
>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>>> Any help is appreciated..
>>>>>>>>>> Best,
>>>>>>>>>> Flavio
>>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>>>>> wrote:
>>>>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>>>>> confident are you about it?
>>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <>
>>>>>>>>>> wrote:
>>>>>>>>>> Looking at git log of , there has been
>>>>>>>>>> some recent change.
>>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>>>>>> reproducible ?
>>>>>>>>>> Cheers
>>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>>>>> wrote:
>>>>>>>>>> Hi to all,
>>>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink
>>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any 
>>>>>>>>>> idea?
>>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
>>>>>>>>>> exception:
>>>>>>>>>> null
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> .getIterator(
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>>> (
>>>>>>>>>> at
>>>>>>>>>> ava:460)
>>>>>>>>>> ... 3 more
>>>>>>>>>> Caused by: Thread 'SortMerger Reading
>>>>>>>>>> Thread' terminated due to an exception: null
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $
>>>>>>>>>> Caused by:
>>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>>>>>> gnedByte(
>>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>>> va:747)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(
>>>>>>>>>> at
>>>>>>>>>> serialize(
>>>>>>>>>> at
>>>>>>>>>> serialize(
>>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>>> at
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>> at
>>>>>>>>>> dReader.getNextRecord(
>>>>>>>>>> at
>>>>>>>>>> at
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ReadingThread.go(
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $
>>>>>>>>>> Best,
>>>>>>>>>> Flavio
>>>> --
>>> Best,
>>> Kurt

Reply via email to