Hi,

the error means that you are grouping on a field which contains null
values. We can not compare elements against null, that's why we throw the
exception.
Are you sure that you're not having any null elements inside the DataSet
you're comparing against?


I'm not 100% sure that my fix is correct .. maybe the tests will uncover
that I've overseen something (they are still running).



On Fri, May 15, 2015 at 2:26 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Hi Robert,
> I applied your fix but still I get one error (not in the same point at
> least..)
> Basically what I do is:
>
> DataSet<Tuple2<String, DateTime>> someDates; //this is empty in my test
> DataSet<Tuple3<String, String, DateTime>> someEvents;
> DataSet<Tuple4<String, String, DateTime, DateTime>> res =
> someEvents.coGroup(someDates).where(0).equalTo(0).with(
>  new myCoGroupFunction<Tuple3<String, String, DateTime>, Tuple2<String,
> DateTime>, Tuple4<String, String, DateTime, DateTime>> (...));
> res.print();
>
> in myCoGroupFunction I declare a Tuple4<String, String, DateTime,
> DateTime> reuse = new Tuple4<>() and I collect reuse tuples with t.f3 =
> null.
>
> Then I get this stackTrace:
>
> Caused by: org.apache.flink.types.NullKeyFieldException
> at
> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:76)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:30)
> at
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115)
> at
> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233)
> at
> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.lang.Thread.run(Thread.java:745)
>
> On Fri, May 15, 2015 at 1:57 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Thanks a lot Robert! Don't mention it ;)
>>
>>
>> On Fri, May 15, 2015 at 1:54 PM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hey,
>>>
>>> the patch is in my branch "flink2019".
>>> Its really good that you've found the bug. We were using the wrong kryo
>>> instance to create copies of generic types.
>>>
>>> Once travis validates that everything is good, I'll push it to master.
>>>
>>> On Fri, May 15, 2015 at 12:41 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> So do you think you could release a path soon? I need it to continue my
>>>> work..otherwise if it's very simple you could send me the snippet of code
>>>> to change my local flink version ;)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Fri, May 15, 2015 at 11:22 AM, Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>>
>>>>> Yes ;)
>>>>>
>>>>> On Fri, May 15, 2015 at 11:10 AM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Do you think it's comething easy to fix..?
>>>>>>
>>>>>> On Fri, May 15, 2015 at 10:51 AM, Robert Metzger <rmetz...@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> No problem ;)
>>>>>>>
>>>>>>> I was able to reproduce the issue and filed a JIRA for it:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-2019
>>>>>>>
>>>>>>> On Fri, May 15, 2015 at 10:36 AM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Unfortunately it's really difficult for me to extract the code..I'm
>>>>>>>> using joda shipped with Flink 0.9-SNAPSHOT (i.e. 2.5) and before today 
>>>>>>>> I've
>>>>>>>> never seen this error..als o because DateTime is Serializable :)
>>>>>>>>
>>>>>>>> On Fri, May 15, 2015 at 10:25 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Is there a chance that the version of JodaTime changed?
>>>>>>>>>
>>>>>>>>> 2015-05-15 10:22 GMT+02:00 Robert Metzger <rmetz...@apache.org>:
>>>>>>>>>
>>>>>>>>>> Can you share the Flink program?
>>>>>>>>>> Or at least the definition of the Tuple?
>>>>>>>>>>
>>>>>>>>>> I'll look into this issue in a few minutes.
>>>>>>>>>>
>>>>>>>>>> On Fri, May 15, 2015 at 10:13 AM, Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm using Flink 0.9-SNAPSHOT and I've never seen this error
>>>>>>>>>>> before today (the job haven't changed..)
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 15, 2015 at 10:09 AM, Robert Metzger <
>>>>>>>>>>> rmetz...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Flavio,
>>>>>>>>>>>>
>>>>>>>>>>>> which version of Flink are you using?
>>>>>>>>>>>> If you are using 0.9 something, then this should actually work
>>>>>>>>>>>>  ;)
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 15, 2015 at 10:06 AM, Flavio Pompermaier <
>>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> this morning I run my Flink job and I got the following
>>>>>>>>>>>>> exception serializing a DateTime Tuple..could you help me to 
>>>>>>>>>>>>> understand
>>>>>>>>>>>>> what's happening here?
>>>>>>>>>>>>>
>>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Class cannot be
>>>>>>>>>>>>> created (missing no-arg constructor): 
>>>>>>>>>>>>> org.joda.time.chrono.ISOChronology
>>>>>>>>>>>>> Serialization trace:
>>>>>>>>>>>>> iChronology (org.joda.time.DateTime)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.copy(ObjectField.java:140)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:77)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:1)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:72)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:1)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to