As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
spilling to disk) and the job failed almost immediately..

On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> I debugged a bit the process repeating the job on a sub-slice of the
> entire data (using the id value to filter data with parquet push down
> filters) and all slices completed successfully :(
> So I tried to increase the parallelism (from 1 slot per TM to 4) to see if
> this was somehow a factor of stress but it didn't cause any error.
> Then I almost doubled the number of rows to process and finally the error
> showed up again.
> It seems somehow related to spilling to disk but I can't really understand
> what's going on :(
> This is a summary of my debug attempts:
>
> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>
> id < 10.000.000.000  => 1.857.365 rows => OK
> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows => OK
> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows =>
> OK
> id >= 99.960.000.000 => 32.936.422 rows => OK
>
> 4 TM with 8 GB and 4 slot each, parallelism 16
>
> id >= 99.960.000.000 => 32.936.422 rows => OK
> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>
> Any help is appreciated..
> Best,
> Flavio
>
> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> I could but only if there's a good probability that it fix the
>> problem...how confident are you about it?
>>
>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Looking at git log of DataInputDeserializer.java , there has been some
>>> recent change.
>>>
>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>> reproducible ?
>>>
>>> Cheers
>>>
>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> Hi to all,
>>>> I think I'm again on the weird Exception with the
>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>>> disk but the Exception thrown is not very helpful. Any idea?
>>>>
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1094)
>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>> (GroupReduceDriver.java:99)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>> ... 3 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>> Caused by: java.io.EOFException
>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>> gnedByte(DataInputDeserializer.java:306)
>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:69)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:74)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:28)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:193)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:36)
>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>> daptiveSpanningRecordDeserializer.java:144)
>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:42)
>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>>
>>
>>
>

Reply via email to