I debugged a bit the process repeating the job on a sub-slice of the entire
data (using the id value to filter data with parquet push down filters) and
all slices completed successfully :(
So I tried to increase the parallelism (from 1 slot per TM to 4) to see if
this was somehow a factor of stress but it didn't cause any error.
Then I almost doubled the number of rows to process and finally the error
showed up again.
It seems somehow related to spilling to disk but I can't really understand
what's going on :(
This is a summary of my debug attempts:

4 Task managers with 6 GB  and 1 slot each, parallelism = 4

id < 10.000.000.000  => 1.857.365 rows => OK
id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows => OK
id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows => OK
id >= 99.960.000.000 => 32.936.422 rows => OK

4 TM with 8 GB and 4 slot each, parallelism 16

id >= 99.960.000.000 => 32.936.422 rows => OK
id >= 99.945.000.000  => 56.825.172 rows => ERROR

Any help is appreciated..
Best,
Flavio

On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> I could but only if there's a good probability that it fix the
> problem...how confident are you about it?
>
> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Looking at git log of DataInputDeserializer.java , there has been some
>> recent change.
>>
>> If you have time, maybe try with 1.2.1 RC and see if the error is
>> reproducible ?
>>
>> Cheers
>>
>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Hi to all,
>>> I think I'm again on the weird Exception with the
>>> SpillingAdaptiveSpanningRecordDeserializer...
>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>> disk but the Exception thrown is not very helpful. Any idea?
>>>
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1094)
>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>> (GroupReduceDriver.java:99)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>> ... 3 more
>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>> Caused by: java.io.EOFException
>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>> gnedByte(DataInputDeserializer.java:306)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:69)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:74)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:28)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:193)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:36)
>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>> gate.read(ReusingDeserializationDelegate.java:57)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:144)
>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>> Reader.next(MutableRecordReader.java:42)
>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>> ReaderIterator.java:59)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>

Reply via email to