Your solution works well, many thanks. This solves the exception that I
described previously.

However, in a different part of the script I come across another problem
about reusing data sets. For example, given the script at
https://gist.github.com/GEOFBOT/d670f567f8c886572c8715a6058f8b34, I get an
exception about a data type not having a corresponding deserializer for a
broadcasted variable. However, no data sets in my plan are broadcasted. I
noticed that in the operations diagram generated by Flink, the data set 'S'
is created two times, once each time it is used in the plan, even though I
intended to reuse the data set. One of the creations involves a broadcast.
Presumably this is related to an optimization built into Flink?

I appreciate any insights or help you may have about this problem.

Thanks,
Geoffrey

On Fri, Oct 14, 2016 at 7:24 AM Chesnay Schepler <ches...@apache.org> wrote:

In this branch: https://github.com/zentol/flink/tree/new-iterations you
can find a more fine-grained fix for chaining with
iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695

On 13.10.2016 23:11, Chesnay Schepler wrote:
> The chaining code is definitely related, I also have a pretty clear
> idea how to fix it.
>
> The odd thing is that the Java API doesn't catch this type mismatch;
> the date types are
> known when the plan is generated. This kind of error shouldn't even
> happen.
>
> On 13.10.2016 21:15, Geoffrey Mon wrote:
>> Thank you very much. Disabling chaining with the Python API allows my
>> actual script to run properly. The division by zero must be an issue
>> with
>> the job that I posted on gist.
>>
>> Does that mean that the issue must be in the chaining part of the API?
>> Chaining from the way I understand it is an important optimization that
>> would be important for the performance comparison I wish to make in my
>> project.
>>
>> Cheers,
>> Geoffrey
>>
>> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> A temporary work around appears to be disabling chaining, which you can
>>> do by commenting out L215 "self._find_chains()" in Environment.py.
>>> Note that you then run into a division by zero error, but i can't tell
>>> whether that is a problem of the job or not.
>>>
>>> On 13.10.2016 13:41, Chesnay Schepler wrote:
>>>> Hey Geoffrey,
>>>>
>>>> I was able to reproduce the error and will look into it in more detail
>>>> tomorrow.
>>>>
>>>> Regards,
>>>> Chesnay
>>>>
>>>> On 12.10.2016 23:09, Geoffrey Mon wrote:
>>>>> Hello,
>>>>>
>>>>> Has anyone had a chance to look into this? I am currently working
>>>>> on the
>>>>> problem but I have minimal understanding of how the internal Flink
>>>>> Python
>>>>> API works; any expertise would be greatly appreciated.
>>>>>
>>>>> Thank you very much!
>>>>>
>>>>> Geoffrey
>>>>>
>>>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <geof...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Chesnay,
>>>>>>
>>>>>> Heh, I have discovered that if I do not restart Flink after
>>>>>> running my
>>>>>> original problematic script, then similar issues will manifest
>>>>>> themselves
>>>>>> in other otherwise working scripts. I haven't been able to
>>>>>> completely
>>>>>> narrow down the problem, but I promise this new script will have a
>>>>>> ClassCastException that is completely reproducible. :)
>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>
>>>>>> Thanks,
>>>>>> Geoffrey
>>>>>>
>>>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler
>>>>>> <ches...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Hello Geoffrey,
>>>>>>
>>>>>> this one works for me as well :D
>>>>>>
>>>>>> Regards,
>>>>>> Chesnay
>>>>>>
>>>>>> On 28.09.2016 05:38, Geoffrey Mon wrote:
>>>>>>> Hello Chesnay,
>>>>>>>
>>>>>>> Thank you for your help. After receiving your message I
>>>>>>> recompiled my
>>>>>>> version of Flink completely, and both the NullPointerException
>>>>>>> listed in
>>>>>>> the TODO and the ClassCastException with the join operation went
>>>>>>> away.
>>>>>>> Previously, I had been only recompiling the modules of Flink
>>>>>>> that had
>>>>>> been
>>>>>>> changed to save time using "mvn clean install -pl :module" and
>>>>>>> apparently
>>>>>>> that may have been causing some of my issues.
>>>>>>>
>>>>>>> Now, the problem is more clear: when a specific group reduce
>>>>>>> function in
>>>>>> my
>>>>>>> research project plan file is used within an iteration, I get a
>>>>>>> ClassCastException exception:
>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
>>>
>>>>>>> at
>>>>>>> org.apache.flink.runtime.iterative.io
>>>
.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>>>
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
>>>
>>>>>>> at
>>>>>>
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>>>>>>
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> I'm not sure why this is causing an exception, and I would greatly
>>>>>>> appreciate any assistance. I've revised the barebones error-causing
>>>>>>> plan
>>>>>>> file to focus on this new error source:
>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>> The group reduce function in question seems to work just fine
>>>>>>> outside of
>>>>>>> iterations. I have organized the commits and pushed to a new
>>>>>>> branch to
>>>>>> make
>>>>>>> it easier to test and hopefully review soon:
>>>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Geoffrey
>>>>>>>
>>>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler
>>>>>>> <ches...@apache.org>
>>>>>> wrote:
>>>>>>>> Hello Geoffrey,
>>>>>>>>
>>>>>>>> i could not reproduce this issue with the commits and plan you
>>>>>>>> provided.
>>>>>>>>
>>>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and
>>>>>>>> reverted back to the specified commits) and built Flink from
>>>>>>>> scratch.
>>>>>>>>
>>>>>>>> Could you double check that the code you provided produces the
>>>>>>>> error?
>>>>>>>> Also, which OS/python version are you using?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Chesnay
>>>>>>>>
>>>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I'll try to take a look this week.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Chesnay
>>>>>>>>>
>>>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I have recently been working on adding bulk iterations to the
>>>>>>>>>> Python
>>>>>>>>>> API of
>>>>>>>>>> Flink in order to facilitate a research project I am working on.
>>>>>>>>>> The
>>>>>>>>>> current changes can be seen in this GitHub diff:
>>>>>>>>>>
>>>
https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
>>>
>>>>>>>>>> This implementation seems to work for, at least, simple
>>>>>>>>>> examples,
>>>>>>>>>> such as
>>>>>>>>>> incrementing numbers in a data set. However, with the
>>>>>>>>>> transformations
>>>>>>>>>> required for my project, I get an exception
>>>>>>>>>> "java.lang.ClassCastException:
>>>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple"
>>>>>>>>>> thrown
>>>>>>>>>> from the
>>>>>>>>>> deserializers called by
>>>>>>>>>>
>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>
>>>>>>>>>> I've created the following simplified Python plan by stripping
>>>>>>>>>> down my
>>>>>>>>>> research project code to the problem-causing parts:
>>>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>>>>>
>>>>>>>>>> I have been working on this issue but I don't have any ideas on
>>>>>>>>>> what
>>>>>>>>>> might
>>>>>>>>>> be the problem. Perhaps someone more knowledgeable about the
>>>>>>>>>> interior
>>>>>> of
>>>>>>>>>> the Python API could kindly help?
>>>>>>>>>>
>>>>>>>>>> Thank you very much.
>>>>>>>>>>
>>>>>>>>>> Geoffrey Mon
>>>>>>>>>>
>>>>
>>>
>
>

Reply via email to