Hi Jeyhun Karimov,

I see that you've already opened up a VOTE thread, but since you're talking
about having a prototype already and results, I wondered if you could
include the POC and how you've tested these results in the FLIP?

Best regards,

Martijn

On Tue, Jan 30, 2024 at 4:47 AM Jeyhun Karimov <je.kari...@gmail.com> wrote:

> Hi devs,
>
> I just wanted to give an update on this FLIP.
> I updated the doc based on the comments from Jim.
> Also, I developed a prototype and did some testing.
>
> I in my small prototype I ran the following tests:
>
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
>    -
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4
>
>
> These tests are e2e dag optimization, including query parsing, validation,
> optimization, and checking the results.
>
> In these e2e optimization tests, my prototype was 15-20% faster than
> existing Flink optimization structure (with the "cost" of simplifying the
> codebase).
>
>
> Any questions/comments are more than welcome.
>
>
> Regards,
>
> Jeyhun Karimov
>
> On Wed, Jan 17, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Hi Jim,
> >
> > Thanks for your comments. Please find my answers below:
> >
> >    1. StreamOptimizeContext may still be needed to pass the fact that we
> >>    are optimizing a streaming query.  I don't think this class will go
> >> away
> >>    completely.  (I agree it may become more simple if the kind or
> >>    mini-batch configuration can be removed.)
> >
> >
> > What I meant is that it might go away if we get rid of
> > *isUpdateBeforeRequired* and *getMiniBatchInterval *fields.
> > Of course if we can get rid of only one of them, then the
> > *StreamOptimizeContext* class will not be removed but get simpler.
> > Will update the doc accordingly.
> >
> >    2. How are the mini-batch and changelog inference rules tightly
> coupled?
> >>    I looked a little bit and I haven't seen any connection between them.
> >> It
> >>    seems like the changelog inference is what needs to run multiple
> times.
> >
> >
> > Sorry for the misunderstanding. The mini-batch and changelog inference
> are
> > not coupled among themselves but with the high-level optimization logic.
> > The idea is to separate the query optimization into 1) optimize 2) enrich
> > with changelog inference 3) enrich with mini-batch interval inference and
> > 4) rewrite
> >
> >    3. I think your point about code complexity is unnecessary.
> >> StreamOptimizeContext
> >>    extends org.apache.calcite.plan.Context which is used an interface to
> >> pass
> >>    information and objects through the Calcite stack.
> >
> >
> > I partially agree. Please see my answer above for the question 1.
> >
> >    4. Is an alternative where the complexity of the changelog
> optimization
> >>    can be moved into the `FlinkChangelogModeInferenceProgram`?  (If this
> >> is
> >>    coupling between the mini-batch and changelog rules, then this would
> >> not
> >>    make sense.)
> >
> >
> > Good point. Yes, this is definitely an alternative.
> >
> >    5. There are some other smaller refactorings.  I tried some of them
> >>    here: https://github.com/apache/flink/pull/24108 Mostly, it is
> syntax
> >>    and using lazy vals to avoid recomputing various things.  (Feel free
> to
> >>    take whatever actually works; I haven't run the tests.)
> >
> >
> > I took a look at your PR. For sure, some of the refactorings I will reuse
> > (probably rebase by the time I have this ready :))
> >
> >
> > Separately, folks on the Calcite dev list are thinking about multi-query
> >> optimization:
> >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k
> >> https://issues.apache.org/jira/browse/CALCITE-6188
> >
> >
> > Seems interesting. But Calcite's MQO approach will probably require some
> > drastic changes in our codebase once we adopt it.
> > This approach is more incremental.
> >
> > Hope my comments answer your questions.
> >
> > Regards,
> > Jeyhun Karimov
> >
> > On Wed, Jan 17, 2024 at 2:36 AM Jim Hughes <jhug...@confluent.io.invalid
> >
> > wrote:
> >
> >> Hi Jeyhun,
> >>
> >>
> >> Generally, I like the idea of speeding up the optimizer in the case of
> >> multiple queries!
> >>
> >>
> >> I am new to the optimizer, but I have a few comments / questions.
> >>
> >>
> >>
> >>    1. StreamOptimizeContext may still be needed to pass the fact that we
> >>    are optimizing a streaming query.  I don't think this class will go
> >> away
> >>    completely.  (I agree it may become more simple if the kind or
> >>    mini-batch configuration can be removed.)
> >>    2. How are the mini-batch and changelog inference rules tightly
> >> coupled?
> >>    I looked a little bit and I haven't seen any connection between them.
> >> It
> >>    seems like the changelog inference is what needs to run multiple
> times.
> >>    3. I think your point about code complexity is unnecessary.
> >> StreamOptimizeContext
> >>    extends org.apache.calcite.plan.Context which is used an interface to
> >> pass
> >>    information and objects through the Calcite stack.
> >>    4. Is an alternative where the complexity of the changelog
> optimization
> >>    can be moved into the `FlinkChangelogModeInferenceProgram`?  (If this
> >> is
> >>    coupling between the mini-batch and changelog rules, then this would
> >> not
> >>    make sense.)
> >>    5. There are some other smaller refactorings.  I tried some of them
> >>    here: https://github.com/apache/flink/pull/24108 Mostly, it is
> syntax
> >>    and using lazy vals to avoid recomputing various things.  (Feel free
> to
> >>    take whatever actually works; I haven't run the tests.)
> >>
> >> Separately, folks on the Calcite dev list are thinking about multi-query
> >> optimization:
> >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k
> >> https://issues.apache.org/jira/browse/CALCITE-6188
> >>
> >> Cheers,
> >>
> >>
> >> Jim
> >>
> >> On Tue, Jan 16, 2024 at 5:45 PM Jeyhun Karimov <je.kari...@gmail.com>
> >> wrote:
> >>
> >> > Hi devs,
> >> >
> >> > I’d like to start a discussion on FLIP-419: Optimize multi-sink query
> >> plan
> >> > generation [1].
> >> >
> >> >
> >> > Currently, the optimization process of multi-sink query plans are
> >> > suboptimal: 1) it requires to go through the optimization process
> >> several
> >> > times and 2) as a result of this some low-level code complexity is
> >> > introduced on high level optimization classes such
> >> > as StreamCommonSubGraphBasedOptimizer.
> >> >
> >> >
> >> > To address this issue, this FLIP introduces  to decouple changelog and
> >> > mini-batch interval inference from the main optimization process.
> >> >
> >> > Please find more details in the FLIP wiki document [1]. Looking
> forward
> >> to
> >> > your feedback.
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-419%3A+Optimize+multi-sink+query+plan+generation
> >> >
> >> >
> >> > Regards,
> >> > Jeyhun Karimov
> >> >
> >>
> >
>

Reply via email to