I've pushed the fix to master. Thank you again for reporting the issue. On Fri, May 15, 2015 at 2:59 PM, Robert Metzger <rmetz...@apache.org> wrote:
> ;) Its simply not supported, sorry. > > On Fri, May 15, 2015 at 2:57 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Is it a bug or is it a feature? :) >> >> On Fri, May 15, 2015 at 2:56 PM, Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> I think distinct() is failing on "null" values because its using a >>> reduce operation internally. >>> >>> On Fri, May 15, 2015 at 2:46 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> I found that the problem is caused by a distinct() that I perform >>>> before printing..are null values something not managed by distinct?? >>>> >>>> On Fri, May 15, 2015 at 2:38 PM, Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> the error means that you are grouping on a field which contains null >>>>> values. We can not compare elements against null, that's why we throw the >>>>> exception. >>>>> Are you sure that you're not having any null elements inside the >>>>> DataSet you're comparing against? >>>>> >>>>> >>>>> I'm not 100% sure that my fix is correct .. maybe the tests will >>>>> uncover that I've overseen something (they are still running). >>>>> >>>>> >>>>> >>>>> On Fri, May 15, 2015 at 2:26 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Hi Robert, >>>>>> I applied your fix but still I get one error (not in the same point >>>>>> at least..) >>>>>> Basically what I do is: >>>>>> >>>>>> DataSet<Tuple2<String, DateTime>> someDates; //this is empty in my >>>>>> test >>>>>> DataSet<Tuple3<String, String, DateTime>> someEvents; >>>>>> DataSet<Tuple4<String, String, DateTime, DateTime>> res = >>>>>> someEvents.coGroup(someDates).where(0).equalTo(0).with( >>>>>> new myCoGroupFunction<Tuple3<String, String, >>>>>> DateTime>, Tuple2<String, DateTime>, Tuple4<String, String, DateTime, >>>>>> DateTime>> (...)); >>>>>> res.print(); >>>>>> >>>>>> in myCoGroupFunction I declare a Tuple4<String, String, DateTime, >>>>>> DateTime> reuse = new Tuple4<>() and I collect reuse tuples with t.f3 = >>>>>> null. >>>>>> >>>>>> Then I get this stackTrace: >>>>>> >>>>>> Caused by: org.apache.flink.types.NullKeyFieldException >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:76) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:30) >>>>>> at >>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115) >>>>>> at >>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233) >>>>>> at >>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194) >>>>>> at >>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504) >>>>>> at >>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> On Fri, May 15, 2015 at 1:57 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Thanks a lot Robert! Don't mention it ;) >>>>>>> >>>>>>> >>>>>>> On Fri, May 15, 2015 at 1:54 PM, Robert Metzger <rmetz...@apache.org >>>>>>> > wrote: >>>>>>> >>>>>>>> Hey, >>>>>>>> >>>>>>>> the patch is in my branch "flink2019". >>>>>>>> Its really good that you've found the bug. We were using the wrong >>>>>>>> kryo instance to create copies of generic types. >>>>>>>> >>>>>>>> Once travis validates that everything is good, I'll push it to >>>>>>>> master. >>>>>>>> >>>>>>>> On Fri, May 15, 2015 at 12:41 PM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> So do you think you could release a path soon? I need it to >>>>>>>>> continue my work..otherwise if it's very simple you could send me the >>>>>>>>> snippet of code to change my local flink version ;) >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Flavio >>>>>>>>> >>>>>>>>> On Fri, May 15, 2015 at 11:22 AM, Robert Metzger < >>>>>>>>> rmetz...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Yes ;) >>>>>>>>>> >>>>>>>>>> On Fri, May 15, 2015 at 11:10 AM, Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>>> Do you think it's comething easy to fix..? >>>>>>>>>>> >>>>>>>>>>> On Fri, May 15, 2015 at 10:51 AM, Robert Metzger < >>>>>>>>>>> rmetz...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> No problem ;) >>>>>>>>>>>> >>>>>>>>>>>> I was able to reproduce the issue and filed a JIRA for it: >>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-2019 >>>>>>>>>>>> >>>>>>>>>>>> On Fri, May 15, 2015 at 10:36 AM, Flavio Pompermaier < >>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Unfortunately it's really difficult for me to extract the >>>>>>>>>>>>> code..I'm using joda shipped with Flink 0.9-SNAPSHOT (i.e. 2.5) >>>>>>>>>>>>> and before >>>>>>>>>>>>> today I've never seen this error..als o because DateTime is >>>>>>>>>>>>> Serializable :) >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, May 15, 2015 at 10:25 AM, Fabian Hueske < >>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Is there a chance that the version of JodaTime changed? >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2015-05-15 10:22 GMT+02:00 Robert Metzger < >>>>>>>>>>>>>> rmetz...@apache.org>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Can you share the Flink program? >>>>>>>>>>>>>>> Or at least the definition of the Tuple? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'll look into this issue in a few minutes. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:13 AM, Flavio Pompermaier < >>>>>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm using Flink 0.9-SNAPSHOT and I've never seen this error >>>>>>>>>>>>>>>> before today (the job haven't changed..) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:09 AM, Robert Metzger < >>>>>>>>>>>>>>>> rmetz...@apache.org> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Flavio, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> which version of Flink are you using? >>>>>>>>>>>>>>>>> If you are using 0.9 something, then this should actually >>>>>>>>>>>>>>>>> work ;) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:06 AM, Flavio Pompermaier < >>>>>>>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi to all, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> this morning I run my Flink job and I got the following >>>>>>>>>>>>>>>>>> exception serializing a DateTime Tuple..could you help me to >>>>>>>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>>> what's happening here? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Class cannot be >>>>>>>>>>>>>>>>>> created (missing no-arg constructor): >>>>>>>>>>>>>>>>>> org.joda.time.chrono.ISOChronology >>>>>>>>>>>>>>>>>> Serialization trace: >>>>>>>>>>>>>>>>>> iChronology (org.joda.time.DateTime) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624) >>>>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.copy(ObjectField.java:140) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634) >>>>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:77) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:1) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:72) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:1) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >