Your solution works well, many thanks. This solves the exception that I described previously.
However, in a different part of the script I come across another problem about reusing data sets. For example, given the script at https://gist.github.com/GEOFBOT/d670f567f8c886572c8715a6058f8b34, I get an exception about a data type not having a corresponding deserializer for a broadcasted variable. However, no data sets in my plan are broadcasted. I noticed that in the operations diagram generated by Flink, the data set 'S' is created two times, once each time it is used in the plan, even though I intended to reuse the data set. One of the creations involves a broadcast. Presumably this is related to an optimization built into Flink? I appreciate any insights or help you may have about this problem. Thanks, Geoffrey On Fri, Oct 14, 2016 at 7:24 AM Chesnay Schepler <ches...@apache.org> wrote: In this branch: https://github.com/zentol/flink/tree/new-iterations you can find a more fine-grained fix for chaining with iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695 On 13.10.2016 23:11, Chesnay Schepler wrote: > The chaining code is definitely related, I also have a pretty clear > idea how to fix it. > > The odd thing is that the Java API doesn't catch this type mismatch; > the date types are > known when the plan is generated. This kind of error shouldn't even > happen. > > On 13.10.2016 21:15, Geoffrey Mon wrote: >> Thank you very much. Disabling chaining with the Python API allows my >> actual script to run properly. The division by zero must be an issue >> with >> the job that I posted on gist. >> >> Does that mean that the issue must be in the chaining part of the API? >> Chaining from the way I understand it is an important optimization that >> would be important for the performance comparison I wish to make in my >> project. >> >> Cheers, >> Geoffrey >> >> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> A temporary work around appears to be disabling chaining, which you can >>> do by commenting out L215 "self._find_chains()" in Environment.py. >>> Note that you then run into a division by zero error, but i can't tell >>> whether that is a problem of the job or not. >>> >>> On 13.10.2016 13:41, Chesnay Schepler wrote: >>>> Hey Geoffrey, >>>> >>>> I was able to reproduce the error and will look into it in more detail >>>> tomorrow. >>>> >>>> Regards, >>>> Chesnay >>>> >>>> On 12.10.2016 23:09, Geoffrey Mon wrote: >>>>> Hello, >>>>> >>>>> Has anyone had a chance to look into this? I am currently working >>>>> on the >>>>> problem but I have minimal understanding of how the internal Flink >>>>> Python >>>>> API works; any expertise would be greatly appreciated. >>>>> >>>>> Thank you very much! >>>>> >>>>> Geoffrey >>>>> >>>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <geof...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Chesnay, >>>>>> >>>>>> Heh, I have discovered that if I do not restart Flink after >>>>>> running my >>>>>> original problematic script, then similar issues will manifest >>>>>> themselves >>>>>> in other otherwise working scripts. I haven't been able to >>>>>> completely >>>>>> narrow down the problem, but I promise this new script will have a >>>>>> ClassCastException that is completely reproducible. :) >>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>> >>>>>> Thanks, >>>>>> Geoffrey >>>>>> >>>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler >>>>>> <ches...@apache.org> >>>>>> wrote: >>>>>> >>>>>> Hello Geoffrey, >>>>>> >>>>>> this one works for me as well :D >>>>>> >>>>>> Regards, >>>>>> Chesnay >>>>>> >>>>>> On 28.09.2016 05:38, Geoffrey Mon wrote: >>>>>>> Hello Chesnay, >>>>>>> >>>>>>> Thank you for your help. After receiving your message I >>>>>>> recompiled my >>>>>>> version of Flink completely, and both the NullPointerException >>>>>>> listed in >>>>>>> the TODO and the ClassCastException with the join operation went >>>>>>> away. >>>>>>> Previously, I had been only recompiling the modules of Flink >>>>>>> that had >>>>>> been >>>>>>> changed to save time using "mvn clean install -pl :module" and >>>>>>> apparently >>>>>>> that may have been causing some of my issues. >>>>>>> >>>>>>> Now, the problem is more clear: when a specific group reduce >>>>>>> function in >>>>>> my >>>>>>> research project plan file is used within an iteration, I get a >>>>>>> ClassCastException exception: >>>>>>> Caused by: java.lang.ClassCastException: >>>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B >>>>>>> at >>>>>>> >>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) >>> >>>>>>> at >>>>>>> org.apache.flink.runtime.iterative.io >>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) >>> >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) >>>>>>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) >>> >>>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >>>>>> >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> I'm not sure why this is causing an exception, and I would greatly >>>>>>> appreciate any assistance. I've revised the barebones error-causing >>>>>>> plan >>>>>>> file to focus on this new error source: >>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>>> The group reduce function in question seems to work just fine >>>>>>> outside of >>>>>>> iterations. I have organized the commits and pushed to a new >>>>>>> branch to >>>>>> make >>>>>>> it easier to test and hopefully review soon: >>>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations >>>>>>> >>>>>>> Cheers, >>>>>>> Geoffrey >>>>>>> >>>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler >>>>>>> <ches...@apache.org> >>>>>> wrote: >>>>>>>> Hello Geoffrey, >>>>>>>> >>>>>>>> i could not reproduce this issue with the commits and plan you >>>>>>>> provided. >>>>>>>> >>>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and >>>>>>>> reverted back to the specified commits) and built Flink from >>>>>>>> scratch. >>>>>>>> >>>>>>>> Could you double check that the code you provided produces the >>>>>>>> error? >>>>>>>> Also, which OS/python version are you using? >>>>>>>> >>>>>>>> Regards, >>>>>>>> Chesnay >>>>>>>> >>>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote: >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I'll try to take a look this week. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Chesnay >>>>>>>>> >>>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote: >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I have recently been working on adding bulk iterations to the >>>>>>>>>> Python >>>>>>>>>> API of >>>>>>>>>> Flink in order to facilitate a research project I am working on. >>>>>>>>>> The >>>>>>>>>> current changes can be seen in this GitHub diff: >>>>>>>>>> >>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 >>> >>>>>>>>>> This implementation seems to work for, at least, simple >>>>>>>>>> examples, >>>>>>>>>> such as >>>>>>>>>> incrementing numbers in a data set. However, with the >>>>>>>>>> transformations >>>>>>>>>> required for my project, I get an exception >>>>>>>>>> "java.lang.ClassCastException: >>>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" >>>>>>>>>> thrown >>>>>>>>>> from the >>>>>>>>>> deserializers called by >>>>>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. >>> >>>>>>>>>> I've created the following simplified Python plan by stripping >>>>>>>>>> down my >>>>>>>>>> research project code to the problem-causing parts: >>>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>>>>>> >>>>>>>>>> I have been working on this issue but I don't have any ideas on >>>>>>>>>> what >>>>>>>>>> might >>>>>>>>>> be the problem. Perhaps someone more knowledgeable about the >>>>>>>>>> interior >>>>>> of >>>>>>>>>> the Python API could kindly help? >>>>>>>>>> >>>>>>>>>> Thank you very much. >>>>>>>>>> >>>>>>>>>> Geoffrey Mon >>>>>>>>>> >>>> >>> > >