Thank you very much. Disabling chaining with the Python API allows my actual script to run properly. The division by zero must be an issue with the job that I posted on gist.
Does that mean that the issue must be in the chaining part of the API? Chaining from the way I understand it is an important optimization that would be important for the performance comparison I wish to make in my project. Cheers, Geoffrey On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <ches...@apache.org> wrote: > A temporary work around appears to be disabling chaining, which you can > do by commenting out L215 "self._find_chains()" in Environment.py. > Note that you then run into a division by zero error, but i can't tell > whether that is a problem of the job or not. > > On 13.10.2016 13:41, Chesnay Schepler wrote: > > Hey Geoffrey, > > > > I was able to reproduce the error and will look into it in more detail > > tomorrow. > > > > Regards, > > Chesnay > > > > On 12.10.2016 23:09, Geoffrey Mon wrote: > >> Hello, > >> > >> Has anyone had a chance to look into this? I am currently working on the > >> problem but I have minimal understanding of how the internal Flink > >> Python > >> API works; any expertise would be greatly appreciated. > >> > >> Thank you very much! > >> > >> Geoffrey > >> > >> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <geof...@gmail.com> wrote: > >> > >>> Hi Chesnay, > >>> > >>> Heh, I have discovered that if I do not restart Flink after running my > >>> original problematic script, then similar issues will manifest > >>> themselves > >>> in other otherwise working scripts. I haven't been able to completely > >>> narrow down the problem, but I promise this new script will have a > >>> ClassCastException that is completely reproducible. :) > >>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>> > >>> Thanks, > >>> Geoffrey > >>> > >>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <ches...@apache.org> > >>> wrote: > >>> > >>> Hello Geoffrey, > >>> > >>> this one works for me as well :D > >>> > >>> Regards, > >>> Chesnay > >>> > >>> On 28.09.2016 05:38, Geoffrey Mon wrote: > >>>> Hello Chesnay, > >>>> > >>>> Thank you for your help. After receiving your message I recompiled my > >>>> version of Flink completely, and both the NullPointerException > >>>> listed in > >>>> the TODO and the ClassCastException with the join operation went away. > >>>> Previously, I had been only recompiling the modules of Flink that had > >>> been > >>>> changed to save time using "mvn clean install -pl :module" and > >>>> apparently > >>>> that may have been causing some of my issues. > >>>> > >>>> Now, the problem is more clear: when a specific group reduce > >>>> function in > >>> my > >>>> research project plan file is used within an iteration, I get a > >>>> ClassCastException exception: > >>>> Caused by: java.lang.ClassCastException: > >>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B > >>>> at > >>>> > >>> > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) > >>> > >>>> at > >>>> org.apache.flink.runtime.iterative.io > >>> > .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) > >>> > >>>> at > >>>> > >>> > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > >>> > >>>> at > >>>> > >>> > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) > >>> > >>>> at > >>>> > >>> > org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) > >>> > >>>> at > >>>> > >>> > org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) > >>> > >>>> at > >>>> > >>> > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > >>> > >>>> at > >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > >>>> at > >>>> > >>> > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > >>> > >>>> at > >>>> > >>> > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > >>> > >>>> at > >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) > >>>> at java.lang.Thread.run(Thread.java:745) > >>>> > >>>> I'm not sure why this is causing an exception, and I would greatly > >>>> appreciate any assistance. I've revised the barebones error-causing > >>>> plan > >>>> file to focus on this new error source: > >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>>> The group reduce function in question seems to work just fine > >>>> outside of > >>>> iterations. I have organized the commits and pushed to a new branch to > >>> make > >>>> it easier to test and hopefully review soon: > >>>> https://github.com/GEOFBOT/flink/tree/new-iterations > >>>> > >>>> Cheers, > >>>> Geoffrey > >>>> > >>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <ches...@apache.org> > >>> wrote: > >>>>> Hello Geoffrey, > >>>>> > >>>>> i could not reproduce this issue with the commits and plan you > >>>>> provided. > >>>>> > >>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and > >>>>> reverted back to the specified commits) and built Flink from scratch. > >>>>> > >>>>> Could you double check that the code you provided produces the error? > >>>>> Also, which OS/python version are you using? > >>>>> > >>>>> Regards, > >>>>> Chesnay > >>>>> > >>>>> On 20.09.2016 11:13, Chesnay Schepler wrote: > >>>>>> Hello, > >>>>>> > >>>>>> I'll try to take a look this week. > >>>>>> > >>>>>> Regards, > >>>>>> Chesnay > >>>>>> > >>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote: > >>>>>>> Hello all, > >>>>>>> > >>>>>>> I have recently been working on adding bulk iterations to the > >>>>>>> Python > >>>>>>> API of > >>>>>>> Flink in order to facilitate a research project I am working on. > >>>>>>> The > >>>>>>> current changes can be seen in this GitHub diff: > >>>>>>> > >>> > https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 > >>> > >>>>>>> This implementation seems to work for, at least, simple examples, > >>>>>>> such as > >>>>>>> incrementing numbers in a data set. However, with the > >>>>>>> transformations > >>>>>>> required for my project, I get an exception > >>>>>>> "java.lang.ClassCastException: > >>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown > >>>>>>> from the > >>>>>>> deserializers called by > >>>>>>> > >>> > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. > >>> > >>>>>>> I've created the following simplified Python plan by stripping > >>>>>>> down my > >>>>>>> research project code to the problem-causing parts: > >>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>>>>>> > >>>>>>> I have been working on this issue but I don't have any ideas on > >>>>>>> what > >>>>>>> might > >>>>>>> be the problem. Perhaps someone more knowledgeable about the > >>>>>>> interior > >>> of > >>>>>>> the Python API could kindly help? > >>>>>>> > >>>>>>> Thank you very much. > >>>>>>> > >>>>>>> Geoffrey Mon > >>>>>>> > >>> > > > > > >