I think distinct() is failing on "null" values because its using a reduce
operation internally.

On Fri, May 15, 2015 at 2:46 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> I found that the problem is caused by a distinct() that I perform before
> printing..are null values something not managed by distinct??
>
> On Fri, May 15, 2015 at 2:38 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi,
>>
>> the error means that you are grouping on a field which contains null
>> values. We can not compare elements against null, that's why we throw the
>> exception.
>> Are you sure that you're not having any null elements inside the DataSet
>> you're comparing against?
>>
>>
>> I'm not 100% sure that my fix is correct .. maybe the tests will uncover
>> that I've overseen something (they are still running).
>>
>>
>>
>> On Fri, May 15, 2015 at 2:26 PM, Flavio Pompermaier <pomperma...@okkam.it
>> > wrote:
>>
>>> Hi Robert,
>>> I applied your fix but still I get one error (not in the same point at
>>> least..)
>>> Basically what I do is:
>>>
>>> DataSet<Tuple2<String, DateTime>> someDates; //this is empty in my test
>>> DataSet<Tuple3<String, String, DateTime>> someEvents;
>>> DataSet<Tuple4<String, String, DateTime, DateTime>> res =
>>> someEvents.coGroup(someDates).where(0).equalTo(0).with(
>>>  new myCoGroupFunction<Tuple3<String, String, DateTime>, Tuple2<String,
>>> DateTime>, Tuple4<String, String, DateTime, DateTime>> (...));
>>> res.print();
>>>
>>> in myCoGroupFunction I declare a Tuple4<String, String, DateTime,
>>> DateTime> reuse = new Tuple4<>() and I collect reuse tuples with t.f3 =
>>> null.
>>>
>>> Then I get this stackTrace:
>>>
>>> Caused by: org.apache.flink.types.NullKeyFieldException
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:76)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:30)
>>> at
>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115)
>>> at
>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233)
>>> at
>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> On Fri, May 15, 2015 at 1:57 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> Thanks a lot Robert! Don't mention it ;)
>>>>
>>>>
>>>> On Fri, May 15, 2015 at 1:54 PM, Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> the patch is in my branch "flink2019".
>>>>> Its really good that you've found the bug. We were using the wrong
>>>>> kryo instance to create copies of generic types.
>>>>>
>>>>> Once travis validates that everything is good, I'll push it to master.
>>>>>
>>>>> On Fri, May 15, 2015 at 12:41 PM, Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> So do you think you could release a path soon? I need it to continue
>>>>>> my work..otherwise if it's very simple you could send me the snippet of
>>>>>> code to change my local flink version ;)
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> On Fri, May 15, 2015 at 11:22 AM, Robert Metzger <rmetz...@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> Yes ;)
>>>>>>>
>>>>>>> On Fri, May 15, 2015 at 11:10 AM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Do you think it's comething easy to fix..?
>>>>>>>>
>>>>>>>> On Fri, May 15, 2015 at 10:51 AM, Robert Metzger <
>>>>>>>> rmetz...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> No problem ;)
>>>>>>>>>
>>>>>>>>> I was able to reproduce the issue and filed a JIRA for it:
>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-2019
>>>>>>>>>
>>>>>>>>> On Fri, May 15, 2015 at 10:36 AM, Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> Unfortunately it's really difficult for me to extract the
>>>>>>>>>> code..I'm using joda shipped with Flink 0.9-SNAPSHOT (i.e. 2.5) and 
>>>>>>>>>> before
>>>>>>>>>> today I've never seen this error..als o because DateTime is 
>>>>>>>>>> Serializable :)
>>>>>>>>>>
>>>>>>>>>> On Fri, May 15, 2015 at 10:25 AM, Fabian Hueske <
>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Is there a chance that the version of JodaTime changed?
>>>>>>>>>>>
>>>>>>>>>>> 2015-05-15 10:22 GMT+02:00 Robert Metzger <rmetz...@apache.org>:
>>>>>>>>>>>
>>>>>>>>>>>> Can you share the Flink program?
>>>>>>>>>>>> Or at least the definition of the Tuple?
>>>>>>>>>>>>
>>>>>>>>>>>> I'll look into this issue in a few minutes.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 15, 2015 at 10:13 AM, Flavio Pompermaier <
>>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I'm using Flink 0.9-SNAPSHOT and I've never seen this error
>>>>>>>>>>>>> before today (the job haven't changed..)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:09 AM, Robert Metzger <
>>>>>>>>>>>>> rmetz...@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Flavio,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> which version of Flink are you using?
>>>>>>>>>>>>>> If you are using 0.9 something, then this should actually
>>>>>>>>>>>>>> work  ;)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:06 AM, Flavio Pompermaier <
>>>>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> this morning I run my Flink job and I got the following
>>>>>>>>>>>>>>> exception serializing a DateTime Tuple..could you help me to 
>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>> what's happening here?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Class cannot be
>>>>>>>>>>>>>>> created (missing no-arg constructor): 
>>>>>>>>>>>>>>> org.joda.time.chrono.ISOChronology
>>>>>>>>>>>>>>> Serialization trace:
>>>>>>>>>>>>>>> iChronology (org.joda.time.DateTime)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.copy(ObjectField.java:140)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:77)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:1)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:72)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:1)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to