Hi Piotr :-)

I think it is better to make rules orthogonal. Each rule focuses on an
independent optimization. The rest we need to do is to pass our rules
to HEP planner or VOLCANO planner to get a final plan.


On Wed, Jun 13, 2018 at 5:48 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Maybe this boils down to how do we envision plan modifications, after
> setting up initial upserts/retraction modes/traits. If we do some plan
> rewrite afterwards, do we want to relay on our current dynamic rules to
> “fix it”? Do we want to rerun DataStreamRetractionRules shuttle again after
> rewriting the plan? Or do we want to guarantee, that any plan rewriting
> rule that we run AFTER setting up retractions/upsert traits, do not brake
> them, but must take them into account (for Example if we add a new node, we
> would expect it to have correctly and consistently set retraction traits
> with respect of parent/children).
>
> I was thinking about the last approach - rules executed after adding
> traits should preserve consistency of those traits. That’s why I didn’t
> mind setting up retractions rules in a shuttle.
>
> Piotrek
>
>
> On 6 Jun 2018, at 04:33, Hequn Cheng <chenghe...@gmail.com> wrote:
>
> Hi, thanks for bringing up this discussion.
>
> I agree to unify the UniqueKeyExtractor and DataStreamRetractionRules,
> however I am not sure if it is a good idea to implement it with RelShuttle. 
> Theoretically,
> retraction rules and other rules may depend on each other. So, by using a
> RelShuttle instead of rules we might loose the flexiblity to perform
> further optimizations.
>
> As for the join problem, we can solve it by the flowing two changes:
> 1.Implement current UniqueKeyExtractor by adding a FlinkRelMdUniqueKeys
> RelMetadataProvider in FlinkDefaultRelMetadataProvider, so that we can
> get unique keys of a RelNode during optimization.
> 2.Treat needsUpdatesAsRetraction method in DataStreamRel as a edge
> attribute instead of a node attribute. We can implement this with minor
> changes. The new needsUpdatesAsRetraction in DataStreamJoin will looks like
> `def needsUpdatesAsRetraction(input: RelNode): Boolean`.
> In needsUpdatesAsRetraction of join, we can compare the join key and unique
> keys of the input relnode and return false if unique keys contain join key. In
> this way, the two input edges of join can work in different mode.
>
> Best, Hequn.
>
> On Wed, Jun 6, 2018 at 12:00 AM, Rong Rong <walter...@gmail.com> wrote:
>
>> +1 on the refactoring.
>>
>> I spent some time a while back trying to get a better understanding on
>> the several rules mentioned here.
>> Correct me if I were wrong by I was under the impression that the reason
>> why the rules are split was because AccMode and UpdateMode are the ones
>> that we care about and the "NeedToRetract" was only the "intermediate"
>> indicator. I guess that's the part that confuse me the most.
>>
>> Another thing that confuses me is whether we can mix the modes of
>> operators and while traversing the plan to pick the "least restrictive"
>> mode, like @piotr mentioned, if operators can both support upserts or
>> retractions like in [2b] (the 2nd [2a]).
>>
>> --
>> Rong
>>
>>
>>
>> On Tue, Jun 5, 2018 at 2:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I think the proposed refactoring is a good idea.
>>> It should simplify the logic to determine which update mode to use.
>>> We could also try to make some of the method and field names more
>>> intuitive
>>> and extend the internal documentation a bit.
>>>
>>> @Hequn, It would be good to get your thoughts on this issue as well.
>>> Thank
>>> you!
>>>
>>> While thinking about this issue I noticed a severe bug in how filters
>>> handle upsert messages.
>>> I've opened FLINK-9528 [1] for that.
>>>
>>> Best, Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9528
>>>
>>> 2018-06-04 10:23 GMT+02:00 Timo Walther <twal...@apache.org>:
>>>
>>> > Hi Piotr,
>>> >
>>> > thanks for bringing up this discussion. I was not involved in the
>>> design
>>> > discussions at that time but I also find the logic about upserts and
>>> > retractions in multiple stages quite confusing. So in general +1 for
>>> > simplification, however, by using a RelShuttle instead of rules we
>>> might
>>> > loose the flexiblity to perform further optimizations by introducing
>>> new
>>> > rules in the future. Users could not change the static logic in a
>>> > RelShuttle, right now they can influence the behaviour using
>>> CalciteConfig
>>> > and custom rules.
>>> >
>>> > Regards,
>>> > Timo
>>> >
>>> > Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
>>> >
>>> > Hi,
>>> >>
>>> >> Recently I was looking into upserts and upserts sources in Flink and
>>> >> while doing so, I noticed some potential room for
>>> >> improvement/simplification.
>>> >>
>>> >> Currently there are 3 optimiser rules in DataStreamRetractionRules
>>> that
>>> >> work in three stages followed by UniqueKeyExtractor plan node visitor
>>> to
>>> >> set preferred updates mode, with validation for correct keys for
>>> upserts.
>>> >> First DataStreamRetractionRules setups UpdateAsRetractionTrait, next
>>> in
>>> >> another rule we use it setup AccModeTrait. AccModeTrait has only two
>>> values
>>> >> Acc (upserts) or AccRetract (retractions). This has some severe
>>> limitations
>>> >> and requires additional stage of UniqueKeyExtractor (implemented as a
>>> >> visitor) to actually verify that keys are set correctly.
>>> >>
>>> >> I would propose to unify those into one visitor (probably RelShuttle
>>> >> implementation), that would traverse the plan from root -> leafs. On
>>> a way
>>> >> down it would collect preferences of the nodes regarding updates mode
>>> >> (including keys for upserts). On a way up, it would pick
>>> >> upsert(keys)/retraction/append only modes or fail if that was
>>> impossible
>>> >> [1].
>>> >>
>>> >> I think that would simplify the code by noticeable margin. Instead of
>>> >> having this logic distributed among 4 classes in two files/independent
>>> >> steps, it would be in one simple class.
>>> >>
>>> >> It would open us a possibility for further improvements. For operators
>>> >> that could process both upserts or retractions (with before mentioned
>>> >> solution that decides upsert vs retract in the same step as validating
>>> >> keys) we could choose upserts if the keys are matching and fallback to
>>> >> retractions only if they don't. Now it wouldn’t be possible (example
>>> [2a],
>>> >> [2b]).
>>> >>
>>> >> Thanks Piotrek
>>> >>
>>> >> [1] Example impossible case:
>>> >>
>>> >> DataStream<Tuple3<Integer, Long, String>> ds1 =
>>> >> JavaStreamTestData.getSmall3TupleDataSet(env);
>>> >> Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG)
>>> as
>>> >> a,b,c");
>>> >>
>>> >> DataStream<Tuple3<Integer, Long, String>> ds2 =
>>> >> JavaStreamTestData.getSmall3TupleDataSet(env);
>>> >> Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");
>>> >>
>>> >> Table g1 = t1.groupBy("a").select("a, b.count");
>>> >> Table g2 = t2.groupBy("b").select("a.count as a, b");
>>> >>
>>> >> g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")},
>>> >> false));
>>> >>
>>> >> [2a]
>>> >>
>>> >> val t1 = util.addTable[(Long, Long)]('a, 'b)
>>> >> val t2 = util.addTable[(Long, Long)](‘x, ‘y)
>>> >>
>>> >> val g1 = t1.groupBy("a").select("a, b.count")
>>> >> val g2 = t2.groupBy(“y").select(“x.count, y")
>>> >>
>>> >> val resultTable = g1.join(g2, “a=y”)
>>> >>
>>> >> `g1.join(g2, “a=y”)` could accept upserts from both sides. Now both
>>> are
>>> >> retractions.
>>> >>
>>> >> [2a]
>>> >>
>>> >> val t1 = util.addTable[(Long, Long)]('a, 'b)
>>> >> val t2 = util.addTable[(Long, Long)](‘x, ‘y)
>>> >>
>>> >> val g1 = t1.groupBy("a").select("a, b.count")
>>> >> val g2 = t2.groupBy(“x").select(“x, y.count as y")
>>> >>
>>> >> val resultTable = g1.join(g2, “a=y”)
>>> >>
>>> >> `g1.join(g2, “a=y”)` could accept upserts from g1 (same key column)
>>> but
>>> >> only retractions from g2 (different key columns). Now both are
>>> retractions.
>>> >>
>>> >>
>>> >
>>>
>>
>>
>
>

Reply via email to