Is it a bug or is it a feature? :) On Fri, May 15, 2015 at 2:56 PM, Robert Metzger <rmetz...@apache.org> wrote:
> I think distinct() is failing on "null" values because its using a reduce > operation internally. > > On Fri, May 15, 2015 at 2:46 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> I found that the problem is caused by a distinct() that I perform before >> printing..are null values something not managed by distinct?? >> >> On Fri, May 15, 2015 at 2:38 PM, Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hi, >>> >>> the error means that you are grouping on a field which contains null >>> values. We can not compare elements against null, that's why we throw the >>> exception. >>> Are you sure that you're not having any null elements inside the DataSet >>> you're comparing against? >>> >>> >>> I'm not 100% sure that my fix is correct .. maybe the tests will uncover >>> that I've overseen something (they are still running). >>> >>> >>> >>> On Fri, May 15, 2015 at 2:26 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> Hi Robert, >>>> I applied your fix but still I get one error (not in the same point at >>>> least..) >>>> Basically what I do is: >>>> >>>> DataSet<Tuple2<String, DateTime>> someDates; //this is empty in my test >>>> DataSet<Tuple3<String, String, DateTime>> someEvents; >>>> DataSet<Tuple4<String, String, DateTime, DateTime>> res = >>>> someEvents.coGroup(someDates).where(0).equalTo(0).with( >>>> new myCoGroupFunction<Tuple3<String, String, DateTime>, Tuple2<String, >>>> DateTime>, Tuple4<String, String, DateTime, DateTime>> (...)); >>>> res.print(); >>>> >>>> in myCoGroupFunction I declare a Tuple4<String, String, DateTime, >>>> DateTime> reuse = new Tuple4<>() and I collect reuse tuples with t.f3 = >>>> null. >>>> >>>> Then I get this stackTrace: >>>> >>>> Caused by: org.apache.flink.types.NullKeyFieldException >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:76) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:30) >>>> at >>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115) >>>> at >>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233) >>>> at >>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194) >>>> at >>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504) >>>> at >>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> On Fri, May 15, 2015 at 1:57 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Thanks a lot Robert! Don't mention it ;) >>>>> >>>>> >>>>> On Fri, May 15, 2015 at 1:54 PM, Robert Metzger <rmetz...@apache.org> >>>>> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> the patch is in my branch "flink2019". >>>>>> Its really good that you've found the bug. We were using the wrong >>>>>> kryo instance to create copies of generic types. >>>>>> >>>>>> Once travis validates that everything is good, I'll push it to master. >>>>>> >>>>>> On Fri, May 15, 2015 at 12:41 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> So do you think you could release a path soon? I need it to continue >>>>>>> my work..otherwise if it's very simple you could send me the snippet of >>>>>>> code to change my local flink version ;) >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>> On Fri, May 15, 2015 at 11:22 AM, Robert Metzger < >>>>>>> rmetz...@apache.org> wrote: >>>>>>> >>>>>>>> Yes ;) >>>>>>>> >>>>>>>> On Fri, May 15, 2015 at 11:10 AM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> Do you think it's comething easy to fix..? >>>>>>>>> >>>>>>>>> On Fri, May 15, 2015 at 10:51 AM, Robert Metzger < >>>>>>>>> rmetz...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> No problem ;) >>>>>>>>>> >>>>>>>>>> I was able to reproduce the issue and filed a JIRA for it: >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-2019 >>>>>>>>>> >>>>>>>>>> On Fri, May 15, 2015 at 10:36 AM, Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>>> Unfortunately it's really difficult for me to extract the >>>>>>>>>>> code..I'm using joda shipped with Flink 0.9-SNAPSHOT (i.e. 2.5) and >>>>>>>>>>> before >>>>>>>>>>> today I've never seen this error..als o because DateTime is >>>>>>>>>>> Serializable :) >>>>>>>>>>> >>>>>>>>>>> On Fri, May 15, 2015 at 10:25 AM, Fabian Hueske < >>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Is there a chance that the version of JodaTime changed? >>>>>>>>>>>> >>>>>>>>>>>> 2015-05-15 10:22 GMT+02:00 Robert Metzger <rmetz...@apache.org> >>>>>>>>>>>> : >>>>>>>>>>>> >>>>>>>>>>>>> Can you share the Flink program? >>>>>>>>>>>>> Or at least the definition of the Tuple? >>>>>>>>>>>>> >>>>>>>>>>>>> I'll look into this issue in a few minutes. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, May 15, 2015 at 10:13 AM, Flavio Pompermaier < >>>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I'm using Flink 0.9-SNAPSHOT and I've never seen this error >>>>>>>>>>>>>> before today (the job haven't changed..) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:09 AM, Robert Metzger < >>>>>>>>>>>>>> rmetz...@apache.org> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Flavio, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> which version of Flink are you using? >>>>>>>>>>>>>>> If you are using 0.9 something, then this should actually >>>>>>>>>>>>>>> work ;) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:06 AM, Flavio Pompermaier < >>>>>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi to all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> this morning I run my Flink job and I got the following >>>>>>>>>>>>>>>> exception serializing a DateTime Tuple..could you help me to >>>>>>>>>>>>>>>> understand >>>>>>>>>>>>>>>> what's happening here? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Class cannot be >>>>>>>>>>>>>>>> created (missing no-arg constructor): >>>>>>>>>>>>>>>> org.joda.time.chrono.ISOChronology >>>>>>>>>>>>>>>> Serialization trace: >>>>>>>>>>>>>>>> iChronology (org.joda.time.DateTime) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624) >>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.copy(ObjectField.java:140) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634) >>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:77) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:1) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:72) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:1) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >