[ 
https://issues.apache.org/jira/browse/FLINK-18769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-18769.
---------------------------
    Resolution: Fixed

Fixed in 
- master (1.12.0): 94b23885ca34927e37334fce51b930933cfd79dd
- 1.11.2: ec5a4d3b54de535f97bb67706032eac68d0eb214

> MiniBatch doesn't work with FLIP-95 source
> ------------------------------------------
>
>                 Key: FLINK-18769
>                 URL: https://issues.apache.org/jira/browse/FLINK-18769
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.1
>            Reporter: Nico Kruber
>            Assignee: Jark Wu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.2
>
>
> The following Table API streaming job is stuck when enabling mini batching
> {code}
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     EnvironmentSettings settings =
>         
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
>     // disable mini-batching completely to get a result
>     Configuration tableConf = tableEnv.getConfig()
>         .getConfiguration();
>     tableConf.setString("table.exec.mini-batch.enabled", "true");
>     tableConf.setString("table.exec.mini-batch.allow-latency", "5 s");
>     tableConf.setString("table.exec.mini-batch.size", "5000");
>     tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>     tableEnv.executeSql(
>         "CREATE TABLE input_table ("
>             + "location STRING, "
>             + "population INT"
>             + ") WITH ("
>             + "'connector' = 'kafka', "
>             + "'topic' = 'kafka_batching_input', "
>             + "'properties.bootstrap.servers' = 'localhost:9092', "
>             + "'format' = 'csv', "
>             + "'scan.startup.mode' = 'earliest-offset'"
>             + ")");
>     tableEnv.executeSql(
>         "CREATE TABLE result_table WITH ('connector' = 'print') LIKE 
> input_table (EXCLUDING OPTIONS)");
>     tableEnv
>         .from("input_table")
>         .groupBy($("location"))
>         .select($("location").cast(DataTypes.CHAR(2)).as("location"), 
> $("population").sum().as("population"))
>         .executeInsert("result_table");
> {code}
> I am using a pre-populated Kafka topic called {{kafka_batching_input}} with 
> these elements:
> {code}
> "Berlin",1
> "Berlin",2
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to