zhangzh created FLINK-25471: ------------------------------- Summary: wrong result if table toDataStream then keyey by sum Key: FLINK-25471 URL: https://issues.apache.org/jira/browse/FLINK-25471 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.2 Reporter: zhangzh
I have 6 lines like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction} import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row object TableToDataStreamBatchWordCount { def main(args: Array[String]) { //create env and tableEnv val env = StreamExecutionEnvironment.getExecutionEnvironment env.setRuntimeMode(RuntimeExecutionMode.STREAMING) //env.setRuntimeMode(RuntimeExecutionMode.BATCH) env.setParallelism(7) val tableEnv = StreamTableEnvironment.create(env) // make data ,3 line val resultDS2 = env.fromElements( Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") )(Types.ROW(Types.STRING)) // dataStream[Row] --> Table --> sql to upper transform table val table = tableEnv.fromDataStream(resultDS2).as("word") tableEnv.createTemporaryView(s"tmp_table",table) val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from tmp_table ") // sql transformed table --> DataStream[String] val resultDs = tableEnv.toDataStream(resultTable).map(row => { row.getField("word").asInstanceOf[String] }) // keyby reduce val counts: DataStream[(String, Int)] = resultDs .map((_, 1)) .keyBy(_._1) .sum(1) // print result counts.print() env.execute("WordCount") } } -- This message was sent by Atlassian Jira (v8.20.1#820001)