Hi, i have found the bug: https://issues.apache.org/jira/browse/FLINK-6398,
will open a PR soon.

Best,
Kurt

On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Thanks a lot Kurt!
>
> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt...@gmail.com> wrote:
>
>> Thanks for the test case, i will take a look at it.
>>
>> Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道:
>>
>>> I've created a repository with a unit test to reproduce the error at
>>> https://github.com/fpompermaier/flink-batch-bug/blob/
>>> master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java 
>>> (probably
>>> this error is related also to FLINK-4719).
>>>
>>> The exception is  thrown only when there are null strings and multiple
>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>>> not (but I think so..).
>>> A quick fix for this problem would be very appreciated because it's
>>> bloking a production deployment..
>>>
>>> Thanks in advance to all,
>>> Flavio
>>>
>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> After digging into the code and test I think that the problem is almost
>>>> certainly in the UnilateralSortMerger, there should be a missing
>>>> synchronization on some shared object somewhere...Right now I'm trying to
>>>> understand if this section of code creates some shared object (like queues)
>>>> that are accessed in a bad way when there's spilling to disk:
>>>>
>>>>                // start the thread that reads the input channels
>>>> this.readThread = getReadingThread(exceptionHandler, input,
>>>> circularQueues, largeRecordHandler,
>>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>>>
>>>> // start the thread that sorts the buffers
>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>>> parentTask);
>>>>
>>>> // start the thread that handles spilling to secondary storage
>>>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
>>>> parentTask,
>>>> memoryManager, ioManager, serializerFactory, comparator,
>>>> this.sortReadMemory, this.writeMemory,
>>>> maxNumFileHandles);
>>>> ....
>>>> startThreads();
>>>>
>>>>
>>>> The problem is that the unit tests of GroupReduceDriver use Record and
>>>> testing Rows in not very straightforward and I'm still trying to reproduce
>>>> the problem in a local env..
>>>>
>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Thanks for the explanation . Is there a way to force this behaviour in
>>>>> a local environment (to try to debug the problem)?
>>>>>
>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> these files are used for spilling data to disk. In your case sorted
>>>>>> runs of records.
>>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>>> merged to get a completely sorted record stream.
>>>>>>
>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>>
>>>>>>> The error appears as soon as some taskmanager generates some
>>>>>>> inputchannel file.
>>>>>>> What are those files used for?
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>>> helpful?
>>>>>>>>
>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
>>>>>>>> Thread' terminated due to an exception: Serializer consumed more bytes 
>>>>>>>> than
>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>> serialization
>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>> corresponding
>>>>>>>> Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>> java:465)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>>> k.java:355)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
>>>>>>>> exception:
>>>>>>>> Serializer consumed more bytes than the record had. This indicates 
>>>>>>>> broken
>>>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>>>> Writable), check their serialization methods. If you are using a
>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>> ask.java:1094)
>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>> java:460)
>>>>>>>> ... 3 more
>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>>>> record had. This indicates broken serialization. If you are using 
>>>>>>>> custom
>>>>>>>> serialization types (Value or Writable), check their serialization 
>>>>>>>> methods.
>>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>>>> serializer.
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>> serialization
>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>> corresponding
>>>>>>>> Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>> ReaderIterator.java:59)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>>>>> ySegment.java:104)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
>>>>>>>> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.
>>>>>>>> java:770)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>>>>> ... 5 more
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>>> stefano.bort...@huawei.com> wrote:
>>>>>>>>
>>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>>> initialization on the exception that would trigger the spilling on 
>>>>>>>>> disk.
>>>>>>>>> This would lead to dirty serialization buffer that would eventually 
>>>>>>>>> break
>>>>>>>>> the program. Till worked on it debugging the source code generating 
>>>>>>>>> the
>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio 
>>>>>>>>> can
>>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Stefano
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org]
>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>>>> serializers, not in the sorter.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What types are you using at that point? The Stack Trace reveals
>>>>>>>>> ROW and StringValue, any other involved types?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of
>>>>>>>>> the entire data (using the id value to filter data with parquet push 
>>>>>>>>> down
>>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>>
>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4)
>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause any 
>>>>>>>>> error.
>>>>>>>>>
>>>>>>>>> Then I almost doubled the number of rows to process and finally
>>>>>>>>> the error showed up again.
>>>>>>>>>
>>>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>>>> understand what's going on :(
>>>>>>>>>
>>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>>
>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows =>
>>>>>>>>> OK
>>>>>>>>>
>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
>>>>>>>>> rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>>>  rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any help is appreciated..
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>>>> problem...how confident are you about it?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>>>>> some recent change.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>>>>> reproducible ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>>
>>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>>
>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink
>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
>>>>>>>>> exception:
>>>>>>>>> null
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>> ask.java:1094)
>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>>> java:460)
>>>>>>>>> ... 3 more
>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>>>> terminated due to an exception: null
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>> Caused by: java.io.EOFException
>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.
>>>>>>>>> java:747)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>> --
>> Best,
>> Kurt
>>
>
>

Reply via email to