[ https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu updated FLINK-23020: ---------------------------- Component/s: (was: API / Python) Table SQL / Planner > NullPointerException when running collect twice from Python API > --------------------------------------------------------------- > > Key: FLINK-23020 > URL: https://issues.apache.org/jira/browse/FLINK-23020 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.1 > Reporter: Maciej Bryński > Priority: Major > > Hi, > I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in > following scenario. > 1. I'm creating datagen table. > {code:java} > from pyflink.table import EnvironmentSettings, TableEnvironment, > StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > from pyflink.common import Configuration, Row > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.java_gateway import get_gateway > conf = Configuration() > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > table_env.get_config().get_configuration().set_integer("parallelism.default", > 1) > table_env.execute_sql("DROP TABLE IF EXISTS datagen") > table_env.execute_sql(""" > CREATE TABLE datagen ( > id INT > ) WITH ( > 'connector' = 'datagen' > ) > """) > {code} > 2. Then I'm running collect > {code:java} > try: > result = table_env.sql_query("select * from datagen limit 1").execute() > for r in result.collect(): > print(r) > except KeyboardInterrupt: > result.get_job_client().cancel() > {code} > 3. I'm using "interrupt the kernel" button. This is handled by above > try/except and will cancel the query. > 4. I'm running collect from point 2 one more time. Result: > {code:java} > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-5-98ef93c07bdb> in <module> > 1 try: > ----> 2 result = table_env.sql_query("select * from datagen limit > 1").execute() > 3 for r in result.collect(): > 4 print(r) > 5 except KeyboardInterrupt: > /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) > 1070 """ > 1071 self._t_env._before_execute() > -> 1072 return TableResult(self._j_table.execute()) > 1073 > 1074 def explain(self, *extra_details: ExplainDetail) -> str: > /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, > *args) > 1283 > 1284 answer = self.gateway_client.send_command(command) > -> 1285 return_value = get_return_value( > 1286 answer, self.gateway_client, self.target_id, self.name) > 1287 > /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, > **kw) > 144 def deco(*a, **kw): > 145 try: > --> 146 return f(*a, **kw) > 147 except Py4JJavaError as e: > 148 from pyflink.java_gateway import get_gateway > /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in > get_return_value(answer, gateway_client, target_id, name) > 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 325 if answer[1] == REFERENCE_TYPE: > --> 326 raise Py4JJavaError( > 327 "An error occurred while calling {0}{1}{2}.\n". > 328 format(target_id, ".", name), value) > Py4JJavaError: An error occurred while calling o69.execute. > : java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:73) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) > at > org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) > at > org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) > at > org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) > at > org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) > at > org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858) > at > org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271) > at > org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225) > at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:829) > {code} > > PS. When I'm cancelling job from Web UI then I'm able to run collect twice. > Problem exists only with cancelling the job from the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)