The last week I've been able to run the job several times without any
error. then I just recompiled it and the error reappered :(
This time I had:

java.lang.Exception: The data preparation for task 'CHAIN CoGroup
(CoGroup at main(DataInference.java:372)) -> Map (Map at
writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error:
Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than
the record had. This indicates broken serialization. If you are using
custom serialization types (Value or Writable), check their
serialization methods. If you are using a Kryo-serialized type, check
the corresponding Kryo serializer.
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted
input: Thread 'SortMerger Reading Thread' terminated due to an
exception: Serializer consumed more bytes than the record had. This
indicates broken serialization. If you are using custom serialization
types (Value or Writable), check their serialization methods. If you
are using a Kryo-serialized type, check the corresponding Kryo
serializer.
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
        at 
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than
the record had. This indicates broken serialization. If you are using
custom serialization types (Value or Writable), check their
serialization methods. If you are using a Kryo-serialized type, check
the corresponding Kryo serializer.
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: Serializer consumed more bytes than
the record had. This indicates broken serialization. If you are using
custom serialization types (Value or Writable), check their
serialization methods. If you are using a Kryo-serialized type, check
the corresponding Kryo serializer.
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
        at java.util.ArrayList.elementData(ArrayList.java:418)
        at java.util.ArrayList.get(ArrayList.java:431)
        at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        ... 5 more


I can't really find a way to understand what is causing the error :(

On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <rmetz...@apache.org>
wrote:

> Hi Flavio,
>
> can you privately share the source code of your Flink job with me?
>
> I'm wondering whether the issue might be caused by a version mixup between
> different versions on the cluster (different JVM versions? or different
> files in the lib/ folder?), How are you deploying the Flink job?
>
> Regards,
> Robert
>
>
> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier <pomperma...@okkam.it
> > wrote:
>
>> I tried to reproduce the error on a subset of the data and actually
>> reducing the available memory and increasing a lot the gc (creating a lot
>> of useless objects in one of the first UDFs) caused this error:
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: / by zero
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.lang.ArithmeticException: / by zero
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> I hope this could help to restrict the debugging area :)
>>
>> Best,
>> Flavio
>>
>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> That is a pretty thing indeed :-) Will try to look into this in a few
>>> days...
>>>
>>> Stephan
>>>
>>>
>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> Running the job with log level set to DEBUG made the job run
>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the
>>>> threads could help serialization?
>>>>
>>>>
>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Still not able to reproduce the error locally but remotly :)
>>>>> Any suggestions about how to try to reproduce it locally on a subset
>>>>> of the data?
>>>>> This time I had:
>>>>>
>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>>>>>         at
>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>         at
>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>         at
>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>         at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>         at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>         at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>>
>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Do you have any suggestion about how to reproduce the error on a
>>>>>> subset of the data?
>>>>>> I'm trying changing the following but I can't find a configuration
>>>>>> causing the error :(
>>>>>>
>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>>>>>>         org.apache.flink.configuration.Configuration c = new
>>>>>> org.apache.flink.configuration.Configuration();
>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp");
>>>>>>
>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>>>>>>         c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>>>> 0.9f);
>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
>>>>>>
>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 
>>>>>> 12);
>>>>>>         ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>>>>>>         env.setParallelism(16);
>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>>>>>> JodaDateTimeSerializer.class );
>>>>>>         return env;
>>>>>>     }
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <trohrm...@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> The error look really strange. Flavio, could you compile a test
>>>>>>> program with example data and configuration to reproduce the problem. 
>>>>>>> Given
>>>>>>> that, we could try to debug the problem.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to