Great!! Thanks a lot Kurt

On Thu, Apr 27, 2017 at 5:31 PM, Kurt Young <ykt...@gmail.com> wrote:

> Hi, i have found the bug: https://issues.apache.org/jira/browse/FLINK-6398,
> will open a PR soon.
>
> Best,
> Kurt
>
> On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Thanks a lot Kurt!
>>
>> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt...@gmail.com> wrote:
>>
>>> Thanks for the test case, i will take a look at it.
>>>
>>> Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道:
>>>
>>>> I've created a repository with a unit test to reproduce the error at
>>>> https://github.com/fpompermaier/flink-batch-bug/blob/mast
>>>> er/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java 
>>>> (probably
>>>> this error is related also to FLINK-4719).
>>>>
>>>> The exception is  thrown only when there are null strings and multiple
>>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>>>> not (but I think so..).
>>>> A quick fix for this problem would be very appreciated because it's
>>>> bloking a production deployment..
>>>>
>>>> Thanks in advance to all,
>>>> Flavio
>>>>
>>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> After digging into the code and test I think that the problem is
>>>>> almost certainly in the UnilateralSortMerger, there should be a missing
>>>>> synchronization on some shared object somewhere...Right now I'm trying to
>>>>> understand if this section of code creates some shared object (like 
>>>>> queues)
>>>>> that are accessed in a bad way when there's spilling to disk:
>>>>>
>>>>>                // start the thread that reads the input channels
>>>>> this.readThread = getReadingThread(exceptionHandler, input,
>>>>> circularQueues, largeRecordHandler,
>>>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>>>>
>>>>> // start the thread that sorts the buffers
>>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>>>> parentTask);
>>>>>
>>>>> // start the thread that handles spilling to secondary storage
>>>>> this.spillThread = getSpillingThread(exceptionHandler,
>>>>> circularQueues, parentTask,
>>>>> memoryManager, ioManager, serializerFactory, comparator,
>>>>> this.sortReadMemory, this.writeMemory,
>>>>> maxNumFileHandles);
>>>>> ....
>>>>> startThreads();
>>>>>
>>>>>
>>>>> The problem is that the unit tests of GroupReduceDriver use Record and
>>>>> testing Rows in not very straightforward and I'm still trying to reproduce
>>>>> the problem in a local env..
>>>>>
>>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Thanks for the explanation . Is there a way to force this behaviour
>>>>>> in a local environment (to try to debug the problem)?
>>>>>>
>>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>>
>>>>>>> these files are used for spilling data to disk. In your case sorted
>>>>>>> runs of records.
>>>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>>>> merged to get a completely sorted record stream.
>>>>>>>
>>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> The error appears as soon as some taskmanager generates some
>>>>>>>> inputchannel file.
>>>>>>>> What are those files used for?
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>>>> helpful?
>>>>>>>>>
>>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more 
>>>>>>>>> bytes than
>>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>>> serialization
>>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>>> corresponding
>>>>>>>>> Kryo serializer.
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>> ava:465)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>>>> k.java:355)
>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
>>>>>>>>> exception:
>>>>>>>>> Serializer consumed more bytes than the record had. This indicates 
>>>>>>>>> broken
>>>>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>>>>> Writable), check their serialization methods. If you are using a
>>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>> ask.java:1094)
>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>> ava:460)
>>>>>>>>> ... 3 more
>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>>>> terminated due to an exception: Serializer consumed more bytes than 
>>>>>>>>> the
>>>>>>>>> record had. This indicates broken serialization. If you are using 
>>>>>>>>> custom
>>>>>>>>> serialization types (Value or Writable), check their serialization 
>>>>>>>>> methods.
>>>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>>>>> serializer.
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes
>>>>>>>>> than the record had. This indicates broken serialization. If you are 
>>>>>>>>> using
>>>>>>>>> custom serialization types (Value or Writable), check their 
>>>>>>>>> serialization
>>>>>>>>> methods. If you are using a Kryo-serialized type, check the 
>>>>>>>>> corresponding
>>>>>>>>> Kryo serializer.
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>>>>>> ySegment.java:104)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>> va:770)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>>>>>> ... 5 more
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>>>> stefano.bort...@huawei.com> wrote:
>>>>>>>>>
>>>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>>>> initialization on the exception that would trigger the spilling on 
>>>>>>>>>> disk.
>>>>>>>>>> This would lead to dirty serialization buffer that would eventually 
>>>>>>>>>> break
>>>>>>>>>> the program. Till worked on it debugging the source code generating 
>>>>>>>>>> the
>>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio 
>>>>>>>>>> can
>>>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Stefano
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org]
>>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>>>>> serializers, not in the sorter.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What types are you using at that point? The Stack Trace reveals
>>>>>>>>>> ROW and StringValue, any other involved types?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of
>>>>>>>>>> the entire data (using the id value to filter data with parquet push 
>>>>>>>>>> down
>>>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>>>
>>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4)
>>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause 
>>>>>>>>>> any error.
>>>>>>>>>>
>>>>>>>>>> Then I almost doubled the number of rows to process and finally
>>>>>>>>>> the error showed up again.
>>>>>>>>>>
>>>>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>>>>> understand what's going on :(
>>>>>>>>>>
>>>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>>>
>>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows
>>>>>>>>>> => OK
>>>>>>>>>>
>>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
>>>>>>>>>> rows => OK
>>>>>>>>>>
>>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>>>>  rows => OK
>>>>>>>>>>
>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>>
>>>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Any help is appreciated..
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>>>>> problem...how confident are you about it?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>>>>>> some recent change.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>>>>>> reproducible ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi to all,
>>>>>>>>>>
>>>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>>>
>>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink
>>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any 
>>>>>>>>>> idea?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
>>>>>>>>>> exception:
>>>>>>>>>> null
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>>> ask.java:1094)
>>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>>> ava:460)
>>>>>>>>>> ... 3 more
>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
>>>>>>>>>> Thread' terminated due to an exception: null
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>>> Caused by: java.io.EOFException
>>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>>> va:747)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>> --
>>> Best,
>>> Kurt
>>>
>>
>>
>

Reply via email to