hi Timo: The LAST_VALUE function simply groups by id and then takes the latest row of data for each primary key. I was inspired by this answer: https://stackoverflow.com/questions/48554999/apache-flink-how-to-enable-upsert-mode-for-dynamic-tables
Its implementation is also very simple: class Middle2 extends Serializable{ private val serialVersionUID = 3L var mid:String = "none" } class StringLastValueFunc extends AggregateFunction[JString, Middle2] { override def createAccumulator(): Middle2 = { new Middle2 } def accumulate(acc: Middle2, iValue: String): Unit = { if(iValue != null && iValue.toString != ""){ acc.mid = iValue } } override def getValue(acc: Middle2): JString = { acc.mid } override def getResultType: TypeInformation[JString] = Types.STRING } and I don't think I should set the state expiration time because the data for each primary key changes at any time. I have used the rocksdb backend and set the incremental checkpoint. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/