zhengcanbin created FLINK-9460: ---------------------------------- Summary: Redundant output in table & upsert semantics Key: FLINK-9460 URL: https://issues.apache.org/jira/browse/FLINK-9460 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.5.0 Reporter: zhengcanbin Fix For: 1.6.0 Attachments: image-2018-05-29-11-39-45-698.png, image-2018-05-29-11-51-20-671.png
The output seems incorrect in my table & upsert example, here's the code: {code:java} object VerifyUpsert { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironment() val tEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) val input = env.socketTextStream("localhost", 9099).map { x => val tokens = x.split(",") DemoSource(tokens(0), tokens(1), tokens(2)) } tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 'page_id) val fieldNames = Array("record_time", "pv", "uv") val fieldTypes = Array(Types.STRING, Types.LONG, Types.LONG).asInstanceOf[Array[TypeInformation[_]]] tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, MyPrintSink(fieldNames, fieldTypes)) tEnv.sqlUpdate( """ |INSERT INTO demoSink |SELECT | SUBSTRING(record_time, 1, 16) as record_time, | count(user_id) as pv, | count(DISTINCT user_id) as uv |FROM demoSource |GROUP BY SUBSTRING(record_time, 1, 16) """.stripMargin) env.execute() } case class DemoSource(record_time: String, user_id: String, page_id: String) } case class MyPrintSink(var fNames: Array[String], var fTypes: Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] { override def setKeyFields(keys: Array[String]): Unit = Seq.empty override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {} override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new PrintSinkFunction()) override def getFieldNames: Array[String] = fNames override def getFieldTypes: Array[TypeInformation[_]] = fTypes override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = { val copy = MyPrintSink(fNames, fTypes) copy.fNames = fieldNames copy.fTypes = fieldTypes copy } }{code} when application starts, I type in netcat client one record a time, below table shows outputs for every input record: ||input||output|| |2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)| |2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)| |2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)| |2018-05-24 21:34:12,0,4|{color:#FF0000}(true,2018-05-24 21:34,2,2){color} (true,2018-05-24 21:34,4,3)| when the forth record is consumed, two output records would be printed in sink, obviously the first one record with red color is redundant. I followed the source code and found something wrong with {code:java} org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement {code} !image-2018-05-29-11-51-20-671.png! I think when (!generateRetraction) && !inputC.change is true, we should not invoke out.collect here. [~astephan] please look over this -- This message was sent by Atlassian JIRA (v7.6.3#76005)