[ https://issues.apache.org/jira/browse/FLINK-18545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17156567#comment-17156567 ]
Timo Walther commented on FLINK-18545: -------------------------------------- It would be good to avoid too many overloaded methods in the future. Is it likely that we will add more parameters in the future? In this case we should think about a {{execute(ExecuteOptions), executeSql(String sql, ExecuteOptions)}}. How about we just add a config option to the `TableConfig` that allows to set the submitted job name? By default, the job name should have a meaningful name that fully qualifies the job e.g. include all SQL statements of the StatementSet? > Sql api cannot special flink job name > ------------------------------------- > > Key: FLINK-18545 > URL: https://issues.apache.org/jira/browse/FLINK-18545 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Table SQL / API > Affects Versions: 1.11.0 > Environment: execute sql : > StreamTableEnvironment.executeSql("insert into user_log_sink select user_id, > item_id, category_id, behavior, ts from user_log") > current job name : org.apache.flink.table.api.internal.TableEnvironmentImpl > {code:java} > public TableResult executeInternal(List<ModifyOperation> operations) { > List<Transformation<?>> transformations = translate(operations); > List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations); > String jobName = "insert-into_" + String.join(",", sinkIdentifierNames); > Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, > jobName); > try { > JobClient jobClient = execEnv.executeAsync(pipeline); > TableSchema.Builder builder = TableSchema.builder(); > Object[] affectedRowCounts = new Long[operations.size()]; > for (int i = 0; i < operations.size(); ++i) { > // use sink identifier name as field name > builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT()); > affectedRowCounts[i] = -1L; > } return TableResultImpl.builder() > .jobClient(jobClient) > .resultKind(ResultKind.SUCCESS_WITH_CONTENT) > .tableSchema(builder.build()) > .data(Collections.singletonList(Row.of(affectedRowCounts))) > .build(); > } catch (Exception e) { > throw new TableException("Failed to execute sql", e); > } > } > {code} > Reporter: venn wu > Priority: Critical > > In Flink 1.11.0, {color:#172b4d}StreamTableEnvironment.executeSql(sql) > {color}will explan and execute job Immediately, The job name will special as > "insert-into_sink-table-name". But we have Multiple sql job will insert into > a same sink table, this is not very friendly. -- This message was sent by Atlassian Jira (v8.3.4#803005)