Hi All,

We are working on migration existing pipelines from Flink 1.10 to Flink 1.11.
We are using Blink planner and have unified pipelines which can be used in 
stream and batch mode.

Stream pipelines works as expected, but batch once fail on Flink 1.11 if they 
have any table aggregation transformation.

Simple example of failed pipeline:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

TableConfig tableConfig = new TableConfig();
tableConfig.setIdleStateRetentionTime(
        org.apache.flink.api.common.time.Time.minutes(10),
        org.apache.flink.api.common.time.Time.minutes(30)
);
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

// is created using work around with ignoring settings.isStreamingMode() check
StreamTableEnvironment tEnv = create(env, settings, tableConfig);

DataStreamSource<A> streamSource = env.fromCollection(asList(new A("1"), new 
A("2")));

Table table = tEnv.fromDataStream(streamSource);
tEnv.createTemporaryView("A", table);

String sql = "select s from A group by s";

tEnv
         .toRetractStream(tEnv.sqlQuery(sql), Row.class)
         .flatMap(new RetractFlatMap())
         .map(Row::toString)
         .addSink(new TestSinkFunction<>());

env.execute("");

values.forEach(System.out::println);

Exception:
Caused by: java.lang.IllegalStateException: Trying to consume an input 
partition whose producer is not ready (result type: BLOCKING, partition 
consumable: false, producer state: DEPLOYING, partition id: 
9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
                at 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
                …

Adding StreamTableEnvironment execute does not help.

Could you please advise what I`m missing?


Reply via email to