This time I had the following exception (obviously
it.okkam.flinj.model.pojo.TipoSoggetto should be
it.okkam.flink.model.pojo.TipoSoggetto).

java.lang.RuntimeException: Cannot instantiate class.
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
        at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
it.okkam.flinj.model.pojo.TipoSoggetto
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
        ... 10 more



On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> The last week I've been able to run the job several times without any
> error. then I just recompiled it and the error reappered :(
> This time I had:
>
> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at 
> main(DataInference.java:372)) -> Map (Map at 
> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer 
> consumed more bytes than the record had. This indicates broken serialization. 
> If you are using custom serialization types (Value or Writable), check their 
> serialization methods. If you are using a Kryo-serialized type, check the 
> corresponding Kryo serializer.
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>       at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>       at 
> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>       ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
> due to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>       at java.util.ArrayList.elementData(ArrayList.java:418)
>       at java.util.ArrayList.get(ArrayList.java:431)
>       at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>       at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       ... 5 more
>
>
> I can't really find a way to understand what is causing the error :(
>
>
> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Flavio,
>>
>> can you privately share the source code of your Flink job with me?
>>
>> I'm wondering whether the issue might be caused by a version mixup
>> between different versions on the cluster (different JVM versions? or
>> different files in the lib/ folder?), How are you deploying the Flink job?
>>
>> Regards,
>> Robert
>>
>>
>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> I tried to reproduce the error on a subset of the data and actually
>>> reducing the available memory and increasing a lot the gc (creating a lot
>>> of useless objects in one of the first UDFs) caused this error:
>>>
>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>> terminated due to an exception: / by zero
>>> at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>> Caused by: java.lang.ArithmeticException: / by zero
>>> at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651)
>>> at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565)
>>> at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417)
>>> at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>
>>> I hope this could help to restrict the debugging area :)
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> That is a pretty thing indeed :-) Will try to look into this in a few
>>>> days...
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Running the job with log level set to DEBUG made the job run
>>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the
>>>>> threads could help serialization?
>>>>>
>>>>>
>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Still not able to reproduce the error locally but remotly :)
>>>>>> Any suggestions about how to try to reproduce it locally on a subset
>>>>>> of the data?
>>>>>> This time I had:
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>>         at
>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>         at
>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>         at
>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>         at
>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Do you have any suggestion about how to reproduce the error on a
>>>>>>> subset of the data?
>>>>>>> I'm trying changing the following but I can't find a configuration
>>>>>>> causing the error :(
>>>>>>>
>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() {
>>>>>>>         org.apache.flink.configuration.Configuration c = new
>>>>>>> org.apache.flink.configuration.Configuration();
>>>>>>>         c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
>>>>>>> "/tmp");
>>>>>>>
>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp");
>>>>>>>         c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>>>>> 0.9f);
>>>>>>>         c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>>>>>>         c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
>>>>>>>         c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s");
>>>>>>>
>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * 
>>>>>>> 12);
>>>>>>>         ExecutionEnvironment env =
>>>>>>> ExecutionEnvironment.createLocalEnvironment(c);
>>>>>>>         env.setParallelism(16);
>>>>>>>         env.registerTypeWithKryoSerializer(DateTime.class,
>>>>>>> JodaDateTimeSerializer.class );
>>>>>>>         return env;
>>>>>>>     }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <
>>>>>>> trohrm...@apache.org> wrote:
>>>>>>>
>>>>>>>> The error look really strange. Flavio, could you compile a test
>>>>>>>> program with example data and configuration to reproduce the problem. 
>>>>>>>> Given
>>>>>>>> that, we could try to debug the problem.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to