[ https://issues.apache.org/jira/browse/FLINK-18769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-18769. --------------------------- Resolution: Fixed Fixed in - master (1.12.0): 94b23885ca34927e37334fce51b930933cfd79dd - 1.11.2: ec5a4d3b54de535f97bb67706032eac68d0eb214 > MiniBatch doesn't work with FLIP-95 source > ------------------------------------------ > > Key: FLINK-18769 > URL: https://issues.apache.org/jira/browse/FLINK-18769 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.11.1 > Reporter: Nico Kruber > Assignee: Jark Wu > Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.2 > > > The following Table API streaming job is stuck when enabling mini batching > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > // disable mini-batching completely to get a result > Configuration tableConf = tableEnv.getConfig() > .getConfiguration(); > tableConf.setString("table.exec.mini-batch.enabled", "true"); > tableConf.setString("table.exec.mini-batch.allow-latency", "5 s"); > tableConf.setString("table.exec.mini-batch.size", "5000"); > tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); > tableEnv.executeSql( > "CREATE TABLE input_table (" > + "location STRING, " > + "population INT" > + ") WITH (" > + "'connector' = 'kafka', " > + "'topic' = 'kafka_batching_input', " > + "'properties.bootstrap.servers' = 'localhost:9092', " > + "'format' = 'csv', " > + "'scan.startup.mode' = 'earliest-offset'" > + ")"); > tableEnv.executeSql( > "CREATE TABLE result_table WITH ('connector' = 'print') LIKE > input_table (EXCLUDING OPTIONS)"); > tableEnv > .from("input_table") > .groupBy($("location")) > .select($("location").cast(DataTypes.CHAR(2)).as("location"), > $("population").sum().as("population")) > .executeInsert("result_table"); > {code} > I am using a pre-populated Kafka topic called {{kafka_batching_input}} with > these elements: > {code} > "Berlin",1 > "Berlin",2 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)