Hi,

Recently I was looking into upserts and upserts sources in Flink and while 
doing so, I noticed some potential room for improvement/simplification.

Currently there are 3 optimiser rules in DataStreamRetractionRules that work in 
three stages followed by UniqueKeyExtractor plan node visitor to set preferred 
updates mode, with validation for correct keys for upserts. First 
DataStreamRetractionRules setups UpdateAsRetractionTrait, next in another rule 
we use it setup AccModeTrait. AccModeTrait has only two values Acc (upserts) or 
AccRetract (retractions). This has some severe limitations and requires 
additional stage of UniqueKeyExtractor (implemented as a visitor) to actually 
verify that keys are set correctly.

I would propose to unify those into one visitor (probably RelShuttle 
implementation), that would traverse the plan from root -> leafs. On a way down 
it would collect preferences of the nodes regarding updates mode (including 
keys for upserts). On a way up, it would pick upsert(keys)/retraction/append 
only modes or fail if that was impossible [1].

I think that would simplify the code by noticeable margin. Instead of having 
this logic distributed among 4 classes in two files/independent steps, it would 
be in one simple class. 

It would open us a possibility for further improvements. For operators that 
could process both upserts or retractions (with before mentioned solution that 
decides upsert vs retract in the same step as validating keys) we could choose 
upserts if the keys are matching and fallback to retractions only if they 
don't. Now it wouldn’t be possible (example [2a], [2b]).

Thanks Piotrek

[1] Example impossible case:

DataStream<Tuple3<Integer, Long, String>> ds1 = 
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as 
a,b,c");

DataStream<Tuple3<Integer, Long, String>> ds2 = 
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");

Table g1 = t1.groupBy("a").select("a, b.count");
Table g2 = t2.groupBy("b").select("a.count as a, b");

g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")}, false));

[2a] 

val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)

val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“y").select(“x.count, y")

val resultTable = g1.join(g2, “a=y”)

`g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are 
retractions.

[2a] 

val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)

val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“x").select(“x, y.count as y")

val resultTable = g1.join(g2, “a=y”)

`g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but only 
retractions from g2 (different key columns). Now both are retractions.


Reply via email to