Marios Trivyzas created FLINK-26453:
---------------------------------------
Summary: execution.allow-client-job-configurations not checked for
executeAsync
Key: FLINK-26453
URL: https://issues.apache.org/jira/browse/FLINK-26453
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.15.0
Reporter: Marios Trivyzas
Assignee: Fabian Paul
* *checkNotAllowedConfigurations()* should be called by
{*}{*}{*}StreamContextEnvironment#executeAsync(){*}
* Description of the *DeploymentOption* should be more clear, and it's not
only checked by application mode.
* When using a combination of TableAPI and DataStreamApi, the check for
overriding config options is not applied:
Modified code of StreamSQLExample to pass extra config
{noformat}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
conf.set(
ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
ExecutionConfigOptions.NotNullEnforcer.DROP);
// set up the Java Table API
final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env,
EnvironmentSettings.fromConfiguration(conf));
final DataStream<Order> orderA =
env.fromCollection(
Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
final DataStream<Order> orderB =
env.fromCollection(
Arrays.asList(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
new Order(4L, "beer", 1)));
// convert the first DataStream to a Table object
// it will be used "inline" and is not registered in a catalog
final Table tableA = tableEnv.fromDataStream(orderA);
// convert the second DataStream and register it as a view
// it will be accessible under a name
tableEnv.createTemporaryView("TableB", orderB);
// union the two tables
final Table result =
tableEnv.sqlQuery(
"SELECT * FROM "
+ tableA
+ " WHERE amount > 2 UNION ALL "
+ "SELECT * FROM TableB WHERE amount < 2");
// convert the Table back to an insert-only DataStream of type `Order`
tableEnv.toDataStream(result, Order.class).print();
// after the table program is converted to a DataStream program,
// we must use `env.execute()` to submit the job
env.execute();
{noformat}
ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in
flink-conf.yaml and yet no exception is thrown, that is because in
StreamTableEnvironmentImpl:
{noformat}
public static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
TableConfig tableConfig) {{noformat}
we use the
{noformat}
public static Executor lookupExecutor(
ClassLoader classLoader,
String executorIdentifier,
StreamExecutionEnvironment executionEnvironment) {{noformat}
so we don't follow the same path to call the
StreamContextEnvironment#setAsContext
which checks for overriding options depending on the new flag.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)