Hi Piotr,
thanks for bringing up this discussion. I was not involved in the design
discussions at that time but I also find the logic about upserts and
retractions in multiple stages quite confusing. So in general +1 for
simplification, however, by using a RelShuttle instead of rules we might
loose the flexiblity to perform further optimizations by introducing new
rules in the future. Users could not change the static logic in a
RelShuttle, right now they can influence the behaviour using
CalciteConfig and custom rules.
Regards,
Timo
Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
Hi,
Recently I was looking into upserts and upserts sources in Flink and while
doing so, I noticed some potential room for improvement/simplification.
Currently there are 3 optimiser rules in DataStreamRetractionRules that work in
three stages followed by UniqueKeyExtractor plan node visitor to set preferred
updates mode, with validation for correct keys for upserts. First
DataStreamRetractionRules setups UpdateAsRetractionTrait, next in another rule
we use it setup AccModeTrait. AccModeTrait has only two values Acc (upserts) or
AccRetract (retractions). This has some severe limitations and requires
additional stage of UniqueKeyExtractor (implemented as a visitor) to actually
verify that keys are set correctly.
I would propose to unify those into one visitor (probably RelShuttle
implementation), that would traverse the plan from root -> leafs. On a way down
it would collect preferences of the nodes regarding updates mode (including keys
for upserts). On a way up, it would pick upsert(keys)/retraction/append only modes
or fail if that was impossible [1].
I think that would simplify the code by noticeable margin. Instead of having
this logic distributed among 4 classes in two files/independent steps, it would
be in one simple class.
It would open us a possibility for further improvements. For operators that
could process both upserts or retractions (with before mentioned solution that
decides upsert vs retract in the same step as validating keys) we could choose
upserts if the keys are matching and fallback to retractions only if they
don't. Now it wouldn’t be possible (example [2a], [2b]).
Thanks Piotrek
[1] Example impossible case:
DataStream<Tuple3<Integer, Long, String>> ds1 =
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as
a,b,c");
DataStream<Tuple3<Integer, Long, String>> ds2 =
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");
Table g1 = t1.groupBy("a").select("a, b.count");
Table g2 = t2.groupBy("b").select("a.count as a, b");
g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")}, false));
[2a]
val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)
val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“y").select(“x.count, y")
val resultTable = g1.join(g2, “a=y”)
`g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are
retractions.
[2a]
val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)
val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“x").select(“x, y.count as y")
val resultTable = g1.join(g2, “a=y”)
`g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but only
retractions from g2 (different key columns). Now both are retractions.