[ https://issues.apache.org/jira/browse/FLINK-9460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhengcanbin closed FLINK-9460. ------------------------------ Resolution: Not A Problem > Redundant output in table & upsert semantics > -------------------------------------------- > > Key: FLINK-9460 > URL: https://issues.apache.org/jira/browse/FLINK-9460 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Affects Versions: 1.5.0 > Reporter: zhengcanbin > Priority: Minor > Labels: patch > Fix For: 1.6.0 > > Attachments: image-2018-05-29-11-39-45-698.png, > image-2018-05-29-11-51-20-671.png > > > The output seems incorrect in my table & upsert example, here's the code: > {code:java} > object VerifyUpsert { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.createLocalEnvironment() > val tEnv = TableEnvironment.getTableEnvironment(env) > env.setParallelism(1) > val input = env.socketTextStream("localhost", 9099).map { x => > val tokens = x.split(",") > DemoSource(tokens(0), tokens(1), tokens(2)) > } > tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, > 'page_id) > val fieldNames = Array("record_time", "pv", "uv") > val fieldTypes = Array(Types.STRING, Types.LONG, > Types.LONG).asInstanceOf[Array[TypeInformation[_]]] > tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, > MyPrintSink(fieldNames, fieldTypes)) > tEnv.sqlUpdate( > """ > |INSERT INTO demoSink > |SELECT > | SUBSTRING(record_time, 1, 16) as record_time, > | count(user_id) as pv, > | count(DISTINCT user_id) as uv > |FROM demoSource > |GROUP BY SUBSTRING(record_time, 1, 16) > """.stripMargin) > env.execute() > } > case class DemoSource(record_time: String, user_id: String, page_id: String) > } > case class MyPrintSink(var fNames: Array[String], var fTypes: > Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] { > override def setKeyFields(keys: Array[String]): Unit = Seq.empty > override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {} > override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, > fNames) > override def emitDataStream(dataStream: > DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new > PrintSinkFunction()) > override def getFieldNames: Array[String] = fNames > override def getFieldTypes: Array[TypeInformation[_]] = fTypes > override def configure(fieldNames: Array[String], fieldTypes: > Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = { > val copy = MyPrintSink(fNames, fTypes) > copy.fNames = fieldNames > copy.fTypes = fieldTypes > copy > } > }{code} > when application starts, I type in netcat client one record a time, below > table shows outputs for every input record: > > ||input||output|| > |2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)| > |2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)| > |2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)| > |2018-05-24 21:34:12,0,4|{color:#ff0000}(true,2018-05-24 21:34,2,2){color} > (true,2018-05-24 21:34,4,3)| > > when the forth record is consumed, two output records would be printed in > sink, obviously the first one record with red color is redundant. I followed > the source code and found something wrong with > > {code:java} > org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement > {code} > !image-2018-05-29-11-51-20-671.png! > I think when (!generateRetraction) && !inputC.change is true, we should not > invoke out.collect here. > > [~StephanEwen] pls look over this -- This message was sent by Atlassian JIRA (v7.6.3#76005)