Wei Zhong created FLINK-14591: --------------------------------- Summary: Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method Key: FLINK-14591 URL: https://issues.apache.org/jira/browse/FLINK-14591 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Wei Zhong
In current implementation of blink planner, the method "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method to merge the configuration inside TableConfig into global job parameters: {code:scala} override def translate( modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = { if (modifyOperations.isEmpty) { return List.empty[Transformation[_]] } mergeParameters() val relNodes = modifyOperations.map(translateToRel) val optimizedRelNodes = optimize(relNodes) val execNodes = translateToExecNodePlan(optimizedRelNodes) translateToPlan(execNodes) } {code} This translate method is called in every important moment, e.g. execute, toDataStream, insertInto, etc. But as shown above, there is a chance that the method return directly and not call the "mergeParameters". In fact if we set some configurations between the "Table#insertInto" method and "TableEnvironment#execute" method, these configurations will not be merged into global job parameters because the "mergeParameters" method is not called: {code:scala} val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance.useBlinkPlanner.build) ... ... val result = ... val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) tEnv.registerTableSink("MySink", sink) tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") result.insertInto("MySink") // the "jobparam2" configuration will loss tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") tEnv.execute("test") val jobConfig = env.getConfig.getGlobalJobParameters.toMap assertTrue(jobConfig.get("jobparam1")=="value1") // this assertion will fail: assertTrue(jobConfig.get("jobparam2")=="value2"){code} This may bring some confusion to the user. It will be great if we can fix this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)