Hello, Has anyone had a chance to look into this? I am currently working on the problem but I have minimal understanding of how the internal Flink Python API works; any expertise would be greatly appreciated.
Thank you very much! Geoffrey On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <geof...@gmail.com> wrote: > Hi Chesnay, > > Heh, I have discovered that if I do not restart Flink after running my > original problematic script, then similar issues will manifest themselves > in other otherwise working scripts. I haven't been able to completely > narrow down the problem, but I promise this new script will have a > ClassCastException that is completely reproducible. :) > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > > Thanks, > Geoffrey > > On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <ches...@apache.org> > wrote: > > Hello Geoffrey, > > this one works for me as well :D > > Regards, > Chesnay > > On 28.09.2016 05:38, Geoffrey Mon wrote: > > Hello Chesnay, > > > > Thank you for your help. After receiving your message I recompiled my > > version of Flink completely, and both the NullPointerException listed in > > the TODO and the ClassCastException with the join operation went away. > > Previously, I had been only recompiling the modules of Flink that had > been > > changed to save time using "mvn clean install -pl :module" and apparently > > that may have been causing some of my issues. > > > > Now, the problem is more clear: when a specific group reduce function in > my > > research project plan file is used within an iteration, I get a > > ClassCastException exception: > > Caused by: java.lang.ClassCastException: > > org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B > > at > > > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) > > at > > org.apache.flink.runtime.iterative.io > .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) > > at > > > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > > at > > > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) > > at > > > org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) > > at > > > org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) > > at > > > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > > at > > > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > > at > > > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) > > at java.lang.Thread.run(Thread.java:745) > > > > I'm not sure why this is causing an exception, and I would greatly > > appreciate any assistance. I've revised the barebones error-causing plan > > file to focus on this new error source: > > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > > The group reduce function in question seems to work just fine outside of > > iterations. I have organized the commits and pushed to a new branch to > make > > it easier to test and hopefully review soon: > > https://github.com/GEOFBOT/flink/tree/new-iterations > > > > Cheers, > > Geoffrey > > > > On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <ches...@apache.org> > wrote: > > > >> Hello Geoffrey, > >> > >> i could not reproduce this issue with the commits and plan you provided. > >> > >> I tried out both the FLINK-4098 and bulk-iterations branches (and > >> reverted back to the specified commits) and built Flink from scratch. > >> > >> Could you double check that the code you provided produces the error? > >> Also, which OS/python version are you using? > >> > >> Regards, > >> Chesnay > >> > >> On 20.09.2016 11:13, Chesnay Schepler wrote: > >>> Hello, > >>> > >>> I'll try to take a look this week. > >>> > >>> Regards, > >>> Chesnay > >>> > >>> On 20.09.2016 02:38, Geoffrey Mon wrote: > >>>> Hello all, > >>>> > >>>> I have recently been working on adding bulk iterations to the Python > >>>> API of > >>>> Flink in order to facilitate a research project I am working on. The > >>>> current changes can be seen in this GitHub diff: > >>>> > >> > https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 > >>>> > >>>> This implementation seems to work for, at least, simple examples, > >>>> such as > >>>> incrementing numbers in a data set. However, with the transformations > >>>> required for my project, I get an exception > >>>> "java.lang.ClassCastException: > >>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown > >>>> from the > >>>> deserializers called by > >>>> > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. > >>>> I've created the following simplified Python plan by stripping down my > >>>> research project code to the problem-causing parts: > >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>>> > >>>> I have been working on this issue but I don't have any ideas on what > >>>> might > >>>> be the problem. Perhaps someone more knowledgeable about the interior > of > >>>> the Python API could kindly help? > >>>> > >>>> Thank you very much. > >>>> > >>>> Geoffrey Mon > >>>> > >>> > >> > >