Hi, Recently I was looking into upserts and upserts sources in Flink and while doing so, I noticed some potential room for improvement/simplification.
Currently there are 3 optimiser rules in DataStreamRetractionRules that work in three stages followed by UniqueKeyExtractor plan node visitor to set preferred updates mode, with validation for correct keys for upserts. First DataStreamRetractionRules setups UpdateAsRetractionTrait, next in another rule we use it setup AccModeTrait. AccModeTrait has only two values Acc (upserts) or AccRetract (retractions). This has some severe limitations and requires additional stage of UniqueKeyExtractor (implemented as a visitor) to actually verify that keys are set correctly. I would propose to unify those into one visitor (probably RelShuttle implementation), that would traverse the plan from root -> leafs. On a way down it would collect preferences of the nodes regarding updates mode (including keys for upserts). On a way up, it would pick upsert(keys)/retraction/append only modes or fail if that was impossible [1]. I think that would simplify the code by noticeable margin. Instead of having this logic distributed among 4 classes in two files/independent steps, it would be in one simple class. It would open us a possibility for further improvements. For operators that could process both upserts or retractions (with before mentioned solution that decides upsert vs retract in the same step as validating keys) we could choose upserts if the keys are matching and fallback to retractions only if they don't. Now it wouldn’t be possible (example [2a], [2b]). Thanks Piotrek [1] Example impossible case: DataStream<Tuple3<Integer, Long, String>> ds1 = JavaStreamTestData.getSmall3TupleDataSet(env); Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as a,b,c"); DataStream<Tuple3<Integer, Long, String>> ds2 = JavaStreamTestData.getSmall3TupleDataSet(env); Table t2 = tableEnv.fromDataStream(ds2, "a,b,c"); Table g1 = t1.groupBy("a").select("a, b.count"); Table g2 = t2.groupBy("b").select("a.count as a, b"); g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")}, false)); [2a] val t1 = util.addTable[(Long, Long)]('a, 'b) val t2 = util.addTable[(Long, Long)](‘x, ‘y) val g1 = t1.groupBy("a").select("a, b.count") val g2 = t2.groupBy(“y").select(“x.count, y") val resultTable = g1.join(g2, “a=y”) `g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are retractions. [2a] val t1 = util.addTable[(Long, Long)]('a, 'b) val t2 = util.addTable[(Long, Long)](‘x, ‘y) val g1 = t1.groupBy("a").select("a, b.count") val g2 = t2.groupBy(“x").select(“x, y.count as y") val resultTable = g1.join(g2, “a=y”) `g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but only retractions from g2 (different key columns). Now both are retractions.