Hi I had some follow up thoughts on this matter over the weekend. I might be mistaken here, since I have never worked with Calcite before, so whatever I’m writing comes with my previous experience with optimisers and mostly guesses/assumptions that Calcite works as I would expect it to work. So if I’m missing something I would be glad to learn something :) With this disclaimer:
> I think it is better to make rules orthogonal. Each rule focuses on an > independent optimization. The rest we need to do is to pass our rules to HEP > planner or VOLCANO planner to get a final plan. Can we even do that with rules that set up the traits, like primary key or retraction/upsert modes? 1. Volcano Hooking up our three rules, as we have them now for setting up retractions/upserts, and mixing them with optimiser rules that DO NOT understand those traits and DO brake them would mean that volcano might interrupt searching plans space in the middle of having broken upsert plan (after firing optimisation rule, before firing rule that fixes/sets up retraction/upsert). I think that’s one of the assumptions of most (all?) optimisers (especially iterative ones), that optimisation rules can not brake the plan. On the other hand, if we wish our optimisation rules to NOT brake the traits, they must understand them and take them into account in each rewrite that they do. In that case, there is no point of mixing setting up those retraction/upsert rules with optimisation, we could just set up retraction/upsert initially once after some stage of planing and, as I wrote previously, just make sure that follow up optimiser rules do not brake traits. 2. Other planners The same holds true here. We would have to manually order rules and we wouldn’t be able to arbitrarily interleave rules that set up retractions and other optimisation rules, unless those rules DO understand and DO NOT brake retraction/upsert/primary key traits. And again, if we will be forced anyway to have three set of rules: a. Rules that do not understand retraction/upsert/primary key traits b. Rules that set up retraction/upsert/primary key traits c. Rules that do understand retraction/upsert/primary key traits That are carefully ordered, what’s the point or value of mixing b. with c.? On the other hand, I see a value in leaving b. and c. separated - code readability and separation of concerns. Now one have to manually jump between rules/classes to make mental connections what interconnect with what and make assumptions where do those connections end. That’s even visible in the issue, that `UpdateAsRetractionTrait` is leaking outside. This is a purely intermediate thing, that doesn’t have to be exposed outside. However as it is now, it’s clogging the view for the reader. It’s like a private state/field that is being exposed to other classes. With one shuttle/visitor to set up retraction/upsert, this intermediate state could be kept completely internally and hidden from the outsiders. As a side note, having separate visitor/shuttle to validate/set up some traits, would be helpful in writing optimisation rules that on their own DO brake those traits and converting them into rules that DO NOT brake them. We could use such visitor/shuttle as a validation step or a clean up step after a rewrite. Piotrek > On 16 Jun 2018, at 10:42, Hequn Cheng <chenghe...@gmail.com> wrote: > > Hi Piotr :-) > > I think it is better to make rules orthogonal. Each rule focuses on an > independent optimization. The rest we need to do is to pass our rules to HEP > planner or VOLCANO planner to get a final plan. > > > On Wed, Jun 13, 2018 at 5:48 PM, Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> wrote: > Hi, > > Maybe this boils down to how do we envision plan modifications, after setting > up initial upserts/retraction modes/traits. If we do some plan rewrite > afterwards, do we want to relay on our current dynamic rules to “fix it”? Do > we want to rerun DataStreamRetractionRules shuttle again after rewriting the > plan? Or do we want to guarantee, that any plan rewriting rule that we run > AFTER setting up retractions/upsert traits, do not brake them, but must take > them into account (for Example if we add a new node, we would expect it to > have correctly and consistently set retraction traits with respect of > parent/children). > > I was thinking about the last approach - rules executed after adding traits > should preserve consistency of those traits. That’s why I didn’t mind setting > up retractions rules in a shuttle. > > Piotrek > > >> On 6 Jun 2018, at 04:33, Hequn Cheng <chenghe...@gmail.com >> <mailto:chenghe...@gmail.com>> wrote: >> >> Hi, thanks for bringing up this discussion. >> >> I agree to unify the UniqueKeyExtractor and DataStreamRetractionRules, >> however I am not sure if it is a good idea to implement it with RelShuttle. >> Theoretically, retraction rules and other rules may depend on each other. >> So, by using a RelShuttle instead of rules we might loose the flexiblity to >> perform further optimizations. >> >> As for the join problem, we can solve it by the flowing two changes: >> 1.Implement current UniqueKeyExtractor by adding a FlinkRelMdUniqueKeys >> RelMetadataProvider in FlinkDefaultRelMetadataProvider, so that we can get >> unique keys of a RelNode during optimization. >> 2.Treat needsUpdatesAsRetraction method in DataStreamRel as a edge attribute >> instead of a node attribute. We can implement this with minor changes. The >> new needsUpdatesAsRetraction in DataStreamJoin will looks like `def >> needsUpdatesAsRetraction(input: RelNode): Boolean`. In >> needsUpdatesAsRetraction of join, we can compare the join key and unique >> keys of the input relnode and return false if unique keys contain join key. >> In this way, the two input edges of join can work in different mode. >> >> Best, Hequn. >> >> On Wed, Jun 6, 2018 at 12:00 AM, Rong Rong <walter...@gmail.com >> <mailto:walter...@gmail.com>> wrote: >> +1 on the refactoring. >> >> I spent some time a while back trying to get a better understanding on the >> several rules mentioned here. >> Correct me if I were wrong by I was under the impression that the reason why >> the rules are split was because AccMode and UpdateMode are the ones that we >> care about and the "NeedToRetract" was only the "intermediate" indicator. I >> guess that's the part that confuse me the most. >> >> Another thing that confuses me is whether we can mix the modes of operators >> and while traversing the plan to pick the "least restrictive" mode, like >> @piotr mentioned, if operators can both support upserts or retractions like >> in [2b] (the 2nd [2a]). >> >> -- >> Rong >> >> >> >> On Tue, Jun 5, 2018 at 2:35 AM, Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> Hi, >> >> I think the proposed refactoring is a good idea. >> It should simplify the logic to determine which update mode to use. >> We could also try to make some of the method and field names more intuitive >> and extend the internal documentation a bit. >> >> @Hequn, It would be good to get your thoughts on this issue as well. Thank >> you! >> >> While thinking about this issue I noticed a severe bug in how filters >> handle upsert messages. >> I've opened FLINK-9528 [1] for that. >> >> Best, Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK-9528 >> <https://issues.apache.org/jira/browse/FLINK-9528> >> >> 2018-06-04 10:23 GMT+02:00 Timo Walther <twal...@apache.org >> <mailto:twal...@apache.org>>: >> >> > Hi Piotr, >> > >> > thanks for bringing up this discussion. I was not involved in the design >> > discussions at that time but I also find the logic about upserts and >> > retractions in multiple stages quite confusing. So in general +1 for >> > simplification, however, by using a RelShuttle instead of rules we might >> > loose the flexiblity to perform further optimizations by introducing new >> > rules in the future. Users could not change the static logic in a >> > RelShuttle, right now they can influence the behaviour using CalciteConfig >> > and custom rules. >> > >> > Regards, >> > Timo >> > >> > Am 01.06.18 um 13:26 schrieb Piotr Nowojski: >> > >> > Hi, >> >> >> >> Recently I was looking into upserts and upserts sources in Flink and >> >> while doing so, I noticed some potential room for >> >> improvement/simplification. >> >> >> >> Currently there are 3 optimiser rules in DataStreamRetractionRules that >> >> work in three stages followed by UniqueKeyExtractor plan node visitor to >> >> set preferred updates mode, with validation for correct keys for upserts. >> >> First DataStreamRetractionRules setups UpdateAsRetractionTrait, next in >> >> another rule we use it setup AccModeTrait. AccModeTrait has only two >> >> values >> >> Acc (upserts) or AccRetract (retractions). This has some severe >> >> limitations >> >> and requires additional stage of UniqueKeyExtractor (implemented as a >> >> visitor) to actually verify that keys are set correctly. >> >> >> >> I would propose to unify those into one visitor (probably RelShuttle >> >> implementation), that would traverse the plan from root -> leafs. On a way >> >> down it would collect preferences of the nodes regarding updates mode >> >> (including keys for upserts). On a way up, it would pick >> >> upsert(keys)/retraction/append only modes or fail if that was impossible >> >> [1]. >> >> >> >> I think that would simplify the code by noticeable margin. Instead of >> >> having this logic distributed among 4 classes in two files/independent >> >> steps, it would be in one simple class. >> >> >> >> It would open us a possibility for further improvements. For operators >> >> that could process both upserts or retractions (with before mentioned >> >> solution that decides upsert vs retract in the same step as validating >> >> keys) we could choose upserts if the keys are matching and fallback to >> >> retractions only if they don't. Now it wouldn’t be possible (example [2a], >> >> [2b]). >> >> >> >> Thanks Piotrek >> >> >> >> [1] Example impossible case: >> >> >> >> DataStream<Tuple3<Integer, Long, String>> ds1 = >> >> JavaStreamTestData.getSmall3TupleDataSet(env); >> >> Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as >> >> a,b,c"); >> >> >> >> DataStream<Tuple3<Integer, Long, String>> ds2 = >> >> JavaStreamTestData.getSmall3TupleDataSet(env); >> >> Table t2 = tableEnv.fromDataStream(ds2, "a,b,c"); >> >> >> >> Table g1 = t1.groupBy("a").select("a, b.count"); >> >> Table g2 = t2.groupBy("b").select("a.count as a, b"); >> >> >> >> g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")}, >> >> false)); >> >> >> >> [2a] >> >> >> >> val t1 = util.addTable[(Long, Long)]('a, 'b) >> >> val t2 = util.addTable[(Long, Long)](‘x, ‘y) >> >> >> >> val g1 = t1.groupBy("a").select("a, b.count") >> >> val g2 = t2.groupBy(“y").select(“x.count, y") >> >> >> >> val resultTable = g1.join(g2, “a=y”) >> >> >> >> `g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are >> >> retractions. >> >> >> >> [2a] >> >> >> >> val t1 = util.addTable[(Long, Long)]('a, 'b) >> >> val t2 = util.addTable[(Long, Long)](‘x, ‘y) >> >> >> >> val g1 = t1.groupBy("a").select("a, b.count") >> >> val g2 = t2.groupBy(“x").select(“x, y.count as y") >> >> >> >> val resultTable = g1.join(g2, “a=y”) >> >> >> >> `g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but >> >> only retractions from g2 (different key columns). Now both are >> >> retractions. >> >> >> >> >> > >> >> > >