venn wu created FLINK-18545: ------------------------------- Summary: Sql api cannot special flink job name Key: FLINK-18545 URL: https://issues.apache.org/jira/browse/FLINK-18545 Project: Flink Issue Type: Improvement Components: Client / Job Submission, Table SQL / API Affects Versions: 1.11.0 Environment: execute sql :
StreamTableEnvironment.executeSql("insert into user_log_sink select user_id, item_id, category_id, behavior, ts from user_log") current job name : org.apache.flink.table.api.internal.TableEnvironmentImpl {code:java} public TableResult executeInternal(List<ModifyOperation> operations) { List<Transformation<?>> transformations = translate(operations); List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations); String jobName = "insert-into_" + String.join(",", sinkIdentifierNames); Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); try { JobClient jobClient = execEnv.executeAsync(pipeline); TableSchema.Builder builder = TableSchema.builder(); Object[] affectedRowCounts = new Long[operations.size()]; for (int i = 0; i < operations.size(); ++i) { // use sink identifier name as field name builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT()); affectedRowCounts[i] = -1L; } return TableResultImpl.builder() .jobClient(jobClient) .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(builder.build()) .data(Collections.singletonList(Row.of(affectedRowCounts))) .build(); } catch (Exception e) { throw new TableException("Failed to execute sql", e); } } {code} Reporter: venn wu In Flink 1.11.0, {color:#172b4d}StreamTableEnvironment.executeSql(sql) {color}will explan and execute job Immediately, The job name will special as "insert-into_sink-table-name". But we have Multiple sql job will insert into a same sink table, this is not very friendly. -- This message was sent by Atlassian Jira (v8.3.4#803005)