Awesome, thanks Chesnay! On Wed, Aug 17, 2016 at 2:58 AM, Chesnay Schepler <ches...@apache.org> wrote:
> Found the issue, there was a missing tab in the chaining method... > > > On 16.08.2016 12:12, Chesnay Schepler wrote: > >> looks like a bug, will look into it. :) >> >> On 16.08.2016 10:29, Ufuk Celebi wrote: >> >>> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who >>> originally contributed the Python API. He can probably tell whether >>> this is a bug in the Python API or Flink ioperator side of things. ;) >>> >>> On Mon, Aug 15, 2016 at 10:14 PM, davis k <forclazzproje...@gmail.com> >>> wrote: >>> >>>> I've got an issue performing joins using Python API in flink-1.1.1. With >>>> this example code get an NPE (below). However, the NPE disappears when >>>> the >>>> filter is removed. Is there an error I'm making in this brief example >>>> or is >>>> this a Flink bug? >>>> >>>> >>>> >>>> env = get_environment() >>>> env.set_parallelism(1) >>>> >>>> input1 = env.from_elements("1|0","1|2") \ >>>> .map(lambda x: x.split("|")) >>>> >>>> input2 = env.from_elements("1|b") \ >>>> .map(lambda x: x.split("|")) \ >>>> .filter(lambda x: x[0] != "0") >>>> >>>> >>>> joined = input1 \ >>>> .join(input2) \ >>>> .where(0) \ >>>> .equal_to(0) \ >>>> .write_text("output.txt", write_mode=WriteMode.OVERWRITE) >>>> >>>> env.execute(local=True) >>>> >>>> >>>> >>>> >>>> >>>> ------------------------------------------------------------ >>>> The program finished with the following exception: >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>> method >>>> caused an error. >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) >>>> >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeIntera >>>> ctiveModeForExecution(PackagedProgram.java:403) >>>> at >>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) >>>> >>>> at >>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) >>>> >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java: >>>> 253) >>>> at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) >>>> >>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java: >>>> 1048) >>>> Caused by: java.lang.NullPointerException >>>> at >>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas >>>> e.<init>(JoinOperatorSetsBase.java:64) >>>> at >>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas >>>> e.<init>(JoinOperatorSetsBase.java:59) >>>> at >>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas >>>> e.<init>(JoinOperatorSetsBase.java:55) >>>> at >>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperato >>>> rSets.<init>(JoinOperator.java:850) >>>> at org.apache.flink.api.java.DataSet.join(DataSet.java:742) >>>> at >>>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599) >>>> >>>> at >>>> org.apache.flink.python.api.PythonPlanBinder.createJoinOpera >>>> tion(PythonPlanBinder.java:591) >>>> at >>>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360) >>>> >>>> at >>>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235) >>>> >>>> at >>>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139) >>>> >>>> at >>>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112) >>>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> >>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) >>>> >>>> ... 6 more >>>> >>> >> >> >