;) Its simply not supported, sorry. On Fri, May 15, 2015 at 2:57 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> Is it a bug or is it a feature? :) > > On Fri, May 15, 2015 at 2:56 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> I think distinct() is failing on "null" values because its using a reduce >> operation internally. >> >> On Fri, May 15, 2015 at 2:46 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> I found that the problem is caused by a distinct() that I perform before >>> printing..are null values something not managed by distinct?? >>> >>> On Fri, May 15, 2015 at 2:38 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> the error means that you are grouping on a field which contains null >>>> values. We can not compare elements against null, that's why we throw the >>>> exception. >>>> Are you sure that you're not having any null elements inside the >>>> DataSet you're comparing against? >>>> >>>> >>>> I'm not 100% sure that my fix is correct .. maybe the tests will >>>> uncover that I've overseen something (they are still running). >>>> >>>> >>>> >>>> On Fri, May 15, 2015 at 2:26 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi Robert, >>>>> I applied your fix but still I get one error (not in the same point at >>>>> least..) >>>>> Basically what I do is: >>>>> >>>>> DataSet<Tuple2<String, DateTime>> someDates; //this is empty in my test >>>>> DataSet<Tuple3<String, String, DateTime>> someEvents; >>>>> DataSet<Tuple4<String, String, DateTime, DateTime>> res = >>>>> someEvents.coGroup(someDates).where(0).equalTo(0).with( >>>>> new myCoGroupFunction<Tuple3<String, String, >>>>> DateTime>, Tuple2<String, DateTime>, Tuple4<String, String, DateTime, >>>>> DateTime>> (...)); >>>>> res.print(); >>>>> >>>>> in myCoGroupFunction I declare a Tuple4<String, String, DateTime, >>>>> DateTime> reuse = new Tuple4<>() and I collect reuse tuples with t.f3 = >>>>> null. >>>>> >>>>> Then I get this stackTrace: >>>>> >>>>> Caused by: org.apache.flink.types.NullKeyFieldException >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:76) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:30) >>>>> at >>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115) >>>>> at >>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233) >>>>> at >>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194) >>>>> at >>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504) >>>>> at >>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> On Fri, May 15, 2015 at 1:57 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Thanks a lot Robert! Don't mention it ;) >>>>>> >>>>>> >>>>>> On Fri, May 15, 2015 at 1:54 PM, Robert Metzger <rmetz...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hey, >>>>>>> >>>>>>> the patch is in my branch "flink2019". >>>>>>> Its really good that you've found the bug. We were using the wrong >>>>>>> kryo instance to create copies of generic types. >>>>>>> >>>>>>> Once travis validates that everything is good, I'll push it to >>>>>>> master. >>>>>>> >>>>>>> On Fri, May 15, 2015 at 12:41 PM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> So do you think you could release a path soon? I need it to >>>>>>>> continue my work..otherwise if it's very simple you could send me the >>>>>>>> snippet of code to change my local flink version ;) >>>>>>>> >>>>>>>> Best, >>>>>>>> Flavio >>>>>>>> >>>>>>>> On Fri, May 15, 2015 at 11:22 AM, Robert Metzger < >>>>>>>> rmetz...@apache.org> wrote: >>>>>>>> >>>>>>>>> Yes ;) >>>>>>>>> >>>>>>>>> On Fri, May 15, 2015 at 11:10 AM, Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>>> Do you think it's comething easy to fix..? >>>>>>>>>> >>>>>>>>>> On Fri, May 15, 2015 at 10:51 AM, Robert Metzger < >>>>>>>>>> rmetz...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> No problem ;) >>>>>>>>>>> >>>>>>>>>>> I was able to reproduce the issue and filed a JIRA for it: >>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-2019 >>>>>>>>>>> >>>>>>>>>>> On Fri, May 15, 2015 at 10:36 AM, Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>> >>>>>>>>>>>> Unfortunately it's really difficult for me to extract the >>>>>>>>>>>> code..I'm using joda shipped with Flink 0.9-SNAPSHOT (i.e. 2.5) >>>>>>>>>>>> and before >>>>>>>>>>>> today I've never seen this error..als o because DateTime is >>>>>>>>>>>> Serializable :) >>>>>>>>>>>> >>>>>>>>>>>> On Fri, May 15, 2015 at 10:25 AM, Fabian Hueske < >>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Is there a chance that the version of JodaTime changed? >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-05-15 10:22 GMT+02:00 Robert Metzger <rmetz...@apache.org >>>>>>>>>>>>> >: >>>>>>>>>>>>> >>>>>>>>>>>>>> Can you share the Flink program? >>>>>>>>>>>>>> Or at least the definition of the Tuple? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'll look into this issue in a few minutes. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:13 AM, Flavio Pompermaier < >>>>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm using Flink 0.9-SNAPSHOT and I've never seen this error >>>>>>>>>>>>>>> before today (the job haven't changed..) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:09 AM, Robert Metzger < >>>>>>>>>>>>>>> rmetz...@apache.org> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Flavio, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> which version of Flink are you using? >>>>>>>>>>>>>>>> If you are using 0.9 something, then this should actually >>>>>>>>>>>>>>>> work ;) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:06 AM, Flavio Pompermaier < >>>>>>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi to all, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> this morning I run my Flink job and I got the following >>>>>>>>>>>>>>>>> exception serializing a DateTime Tuple..could you help me to >>>>>>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>> what's happening here? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Class cannot be >>>>>>>>>>>>>>>>> created (missing no-arg constructor): >>>>>>>>>>>>>>>>> org.joda.time.chrono.ISOChronology >>>>>>>>>>>>>>>>> Serialization trace: >>>>>>>>>>>>>>>>> iChronology (org.joda.time.DateTime) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624) >>>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.copy(ObjectField.java:140) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634) >>>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:77) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:1) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:72) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:1) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> > >