Hi Piotr,

thanks for bringing up this discussion. I was not involved in the design discussions at that time but I also find the logic about upserts and retractions in multiple stages quite confusing. So in general +1 for simplification, however, by using a RelShuttle instead of rules we might loose the flexiblity to perform further optimizations by introducing new rules in the future. Users could not change the static logic in a RelShuttle, right now they can influence the behaviour using CalciteConfig and custom rules.

Regards,
Timo

Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
Hi,

Recently I was looking into upserts and upserts sources in Flink and while 
doing so, I noticed some potential room for improvement/simplification.

Currently there are 3 optimiser rules in DataStreamRetractionRules that work in 
three stages followed by UniqueKeyExtractor plan node visitor to set preferred 
updates mode, with validation for correct keys for upserts. First 
DataStreamRetractionRules setups UpdateAsRetractionTrait, next in another rule 
we use it setup AccModeTrait. AccModeTrait has only two values Acc (upserts) or 
AccRetract (retractions). This has some severe limitations and requires 
additional stage of UniqueKeyExtractor (implemented as a visitor) to actually 
verify that keys are set correctly.

I would propose to unify those into one visitor (probably RelShuttle 
implementation), that would traverse the plan from root -> leafs. On a way down 
it would collect preferences of the nodes regarding updates mode (including keys 
for upserts). On a way up, it would pick upsert(keys)/retraction/append only modes 
or fail if that was impossible [1].

I think that would simplify the code by noticeable margin. Instead of having 
this logic distributed among 4 classes in two files/independent steps, it would 
be in one simple class.

It would open us a possibility for further improvements. For operators that 
could process both upserts or retractions (with before mentioned solution that 
decides upsert vs retract in the same step as validating keys) we could choose 
upserts if the keys are matching and fallback to retractions only if they 
don't. Now it wouldn’t be possible (example [2a], [2b]).

Thanks Piotrek

[1] Example impossible case:

DataStream<Tuple3<Integer, Long, String>> ds1 = 
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as 
a,b,c");

DataStream<Tuple3<Integer, Long, String>> ds2 = 
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");

Table g1 = t1.groupBy("a").select("a, b.count");
Table g2 = t2.groupBy("b").select("a.count as a, b");

g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")}, false));

[2a]

val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)

val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“y").select(“x.count, y")

val resultTable = g1.join(g2, “a=y”)

`g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are 
retractions.

[2a]

val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)

val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“x").select(“x, y.count as y")

val resultTable = g1.join(g2, “a=y”)

`g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but only 
retractions from g2 (different key columns). Now both are retractions.


Reply via email to