Hi Martijn,

Thanks for your comment. I created a quick prototype to make sure that the
proposed hypothesis works. So, it still is a quick implementation.
On my local setup all tests pass, but I might need to spend some time (once
VOTE thread passes probably) to clean it and submit a PR.

Each test was performed via mvn test

(e.g., mvn test ...
-Dtest=org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
)

with master branch of Flink (dd16a4c07b2f8c96740fb522cb54cfd1d5a5e835)
against my PoC over 5 runs.

The runtime is derived from the test case's runtime reported by mvn test (and
not the runtime of the whole mvn test command).


I also included the above information in the FLIP.

Please let me know if you have further questions.


Regards,

Jeyhun





On Fri, Mar 8, 2024 at 12:01 PM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Jeyhun Karimov,
>
> I see that you've already opened up a VOTE thread, but since you're talking
> about having a prototype already and results, I wondered if you could
> include the POC and how you've tested these results in the FLIP?
>
> Best regards,
>
> Martijn
>
> On Tue, Jan 30, 2024 at 4:47 AM Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Hi devs,
> >
> > I just wanted to give an update on this FLIP.
> > I updated the doc based on the comments from Jim.
> > Also, I developed a prototype and did some testing.
> >
> > I in my small prototype I ran the following tests:
> >
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
> >    -
> >
> >
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4
> >
> >
> > These tests are e2e dag optimization, including query parsing,
> validation,
> > optimization, and checking the results.
> >
> > In these e2e optimization tests, my prototype was 15-20% faster than
> > existing Flink optimization structure (with the "cost" of simplifying the
> > codebase).
> >
> >
> > Any questions/comments are more than welcome.
> >
> >
> > Regards,
> >
> > Jeyhun Karimov
> >
> > On Wed, Jan 17, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> >
> > > Hi Jim,
> > >
> > > Thanks for your comments. Please find my answers below:
> > >
> > >    1. StreamOptimizeContext may still be needed to pass the fact that
> we
> > >>    are optimizing a streaming query.  I don't think this class will go
> > >> away
> > >>    completely.  (I agree it may become more simple if the kind or
> > >>    mini-batch configuration can be removed.)
> > >
> > >
> > > What I meant is that it might go away if we get rid of
> > > *isUpdateBeforeRequired* and *getMiniBatchInterval *fields.
> > > Of course if we can get rid of only one of them, then the
> > > *StreamOptimizeContext* class will not be removed but get simpler.
> > > Will update the doc accordingly.
> > >
> > >    2. How are the mini-batch and changelog inference rules tightly
> > coupled?
> > >>    I looked a little bit and I haven't seen any connection between
> them.
> > >> It
> > >>    seems like the changelog inference is what needs to run multiple
> > times.
> > >
> > >
> > > Sorry for the misunderstanding. The mini-batch and changelog inference
> > are
> > > not coupled among themselves but with the high-level optimization
> logic.
> > > The idea is to separate the query optimization into 1) optimize 2)
> enrich
> > > with changelog inference 3) enrich with mini-batch interval inference
> and
> > > 4) rewrite
> > >
> > >    3. I think your point about code complexity is unnecessary.
> > >> StreamOptimizeContext
> > >>    extends org.apache.calcite.plan.Context which is used an interface
> to
> > >> pass
> > >>    information and objects through the Calcite stack.
> > >
> > >
> > > I partially agree. Please see my answer above for the question 1.
> > >
> > >    4. Is an alternative where the complexity of the changelog
> > optimization
> > >>    can be moved into the `FlinkChangelogModeInferenceProgram`?  (If
> this
> > >> is
> > >>    coupling between the mini-batch and changelog rules, then this
> would
> > >> not
> > >>    make sense.)
> > >
> > >
> > > Good point. Yes, this is definitely an alternative.
> > >
> > >    5. There are some other smaller refactorings.  I tried some of them
> > >>    here: https://github.com/apache/flink/pull/24108 Mostly, it is
> > syntax
> > >>    and using lazy vals to avoid recomputing various things.  (Feel
> free
> > to
> > >>    take whatever actually works; I haven't run the tests.)
> > >
> > >
> > > I took a look at your PR. For sure, some of the refactorings I will
> reuse
> > > (probably rebase by the time I have this ready :))
> > >
> > >
> > > Separately, folks on the Calcite dev list are thinking about
> multi-query
> > >> optimization:
> > >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k
> > >> https://issues.apache.org/jira/browse/CALCITE-6188
> > >
> > >
> > > Seems interesting. But Calcite's MQO approach will probably require
> some
> > > drastic changes in our codebase once we adopt it.
> > > This approach is more incremental.
> > >
> > > Hope my comments answer your questions.
> > >
> > > Regards,
> > > Jeyhun Karimov
> > >
> > > On Wed, Jan 17, 2024 at 2:36 AM Jim Hughes
> <jhug...@confluent.io.invalid
> > >
> > > wrote:
> > >
> > >> Hi Jeyhun,
> > >>
> > >>
> > >> Generally, I like the idea of speeding up the optimizer in the case of
> > >> multiple queries!
> > >>
> > >>
> > >> I am new to the optimizer, but I have a few comments / questions.
> > >>
> > >>
> > >>
> > >>    1. StreamOptimizeContext may still be needed to pass the fact that
> we
> > >>    are optimizing a streaming query.  I don't think this class will go
> > >> away
> > >>    completely.  (I agree it may become more simple if the kind or
> > >>    mini-batch configuration can be removed.)
> > >>    2. How are the mini-batch and changelog inference rules tightly
> > >> coupled?
> > >>    I looked a little bit and I haven't seen any connection between
> them.
> > >> It
> > >>    seems like the changelog inference is what needs to run multiple
> > times.
> > >>    3. I think your point about code complexity is unnecessary.
> > >> StreamOptimizeContext
> > >>    extends org.apache.calcite.plan.Context which is used an interface
> to
> > >> pass
> > >>    information and objects through the Calcite stack.
> > >>    4. Is an alternative where the complexity of the changelog
> > optimization
> > >>    can be moved into the `FlinkChangelogModeInferenceProgram`?  (If
> this
> > >> is
> > >>    coupling between the mini-batch and changelog rules, then this
> would
> > >> not
> > >>    make sense.)
> > >>    5. There are some other smaller refactorings.  I tried some of them
> > >>    here: https://github.com/apache/flink/pull/24108 Mostly, it is
> > syntax
> > >>    and using lazy vals to avoid recomputing various things.  (Feel
> free
> > to
> > >>    take whatever actually works; I haven't run the tests.)
> > >>
> > >> Separately, folks on the Calcite dev list are thinking about
> multi-query
> > >> optimization:
> > >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k
> > >> https://issues.apache.org/jira/browse/CALCITE-6188
> > >>
> > >> Cheers,
> > >>
> > >>
> > >> Jim
> > >>
> > >> On Tue, Jan 16, 2024 at 5:45 PM Jeyhun Karimov <je.kari...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi devs,
> > >> >
> > >> > I’d like to start a discussion on FLIP-419: Optimize multi-sink
> query
> > >> plan
> > >> > generation [1].
> > >> >
> > >> >
> > >> > Currently, the optimization process of multi-sink query plans are
> > >> > suboptimal: 1) it requires to go through the optimization process
> > >> several
> > >> > times and 2) as a result of this some low-level code complexity is
> > >> > introduced on high level optimization classes such
> > >> > as StreamCommonSubGraphBasedOptimizer.
> > >> >
> > >> >
> > >> > To address this issue, this FLIP introduces  to decouple changelog
> and
> > >> > mini-batch interval inference from the main optimization process.
> > >> >
> > >> > Please find more details in the FLIP wiki document [1]. Looking
> > forward
> > >> to
> > >> > your feedback.
> > >> >
> > >> > [1]
> > >> >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-419%3A+Optimize+multi-sink+query+plan+generation
> > >> >
> > >> >
> > >> > Regards,
> > >> > Jeyhun Karimov
> > >> >
> > >>
> > >
> >
>

Reply via email to