Hi devs, I just wanted to give an update on this FLIP. I updated the doc based on the comments from Jim. Also, I developed a prototype and did some testing.
I in my small prototype I ran the following tests: - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1 - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2 - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3 - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4 - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5 - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1 - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2 - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3 - org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4 These tests are e2e dag optimization, including query parsing, validation, optimization, and checking the results. In these e2e optimization tests, my prototype was 15-20% faster than existing Flink optimization structure (with the "cost" of simplifying the codebase). Any questions/comments are more than welcome. Regards, Jeyhun Karimov On Wed, Jan 17, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com> wrote: > Hi Jim, > > Thanks for your comments. Please find my answers below: > > 1. StreamOptimizeContext may still be needed to pass the fact that we >> are optimizing a streaming query. I don't think this class will go >> away >> completely. (I agree it may become more simple if the kind or >> mini-batch configuration can be removed.) > > > What I meant is that it might go away if we get rid of > *isUpdateBeforeRequired* and *getMiniBatchInterval *fields. > Of course if we can get rid of only one of them, then the > *StreamOptimizeContext* class will not be removed but get simpler. > Will update the doc accordingly. > > 2. How are the mini-batch and changelog inference rules tightly coupled? >> I looked a little bit and I haven't seen any connection between them. >> It >> seems like the changelog inference is what needs to run multiple times. > > > Sorry for the misunderstanding. The mini-batch and changelog inference are > not coupled among themselves but with the high-level optimization logic. > The idea is to separate the query optimization into 1) optimize 2) enrich > with changelog inference 3) enrich with mini-batch interval inference and > 4) rewrite > > 3. I think your point about code complexity is unnecessary. >> StreamOptimizeContext >> extends org.apache.calcite.plan.Context which is used an interface to >> pass >> information and objects through the Calcite stack. > > > I partially agree. Please see my answer above for the question 1. > > 4. Is an alternative where the complexity of the changelog optimization >> can be moved into the `FlinkChangelogModeInferenceProgram`? (If this >> is >> coupling between the mini-batch and changelog rules, then this would >> not >> make sense.) > > > Good point. Yes, this is definitely an alternative. > > 5. There are some other smaller refactorings. I tried some of them >> here: https://github.com/apache/flink/pull/24108 Mostly, it is syntax >> and using lazy vals to avoid recomputing various things. (Feel free to >> take whatever actually works; I haven't run the tests.) > > > I took a look at your PR. For sure, some of the refactorings I will reuse > (probably rebase by the time I have this ready :)) > > > Separately, folks on the Calcite dev list are thinking about multi-query >> optimization: >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k >> https://issues.apache.org/jira/browse/CALCITE-6188 > > > Seems interesting. But Calcite's MQO approach will probably require some > drastic changes in our codebase once we adopt it. > This approach is more incremental. > > Hope my comments answer your questions. > > Regards, > Jeyhun Karimov > > On Wed, Jan 17, 2024 at 2:36 AM Jim Hughes <jhug...@confluent.io.invalid> > wrote: > >> Hi Jeyhun, >> >> >> Generally, I like the idea of speeding up the optimizer in the case of >> multiple queries! >> >> >> I am new to the optimizer, but I have a few comments / questions. >> >> >> >> 1. StreamOptimizeContext may still be needed to pass the fact that we >> are optimizing a streaming query. I don't think this class will go >> away >> completely. (I agree it may become more simple if the kind or >> mini-batch configuration can be removed.) >> 2. How are the mini-batch and changelog inference rules tightly >> coupled? >> I looked a little bit and I haven't seen any connection between them. >> It >> seems like the changelog inference is what needs to run multiple times. >> 3. I think your point about code complexity is unnecessary. >> StreamOptimizeContext >> extends org.apache.calcite.plan.Context which is used an interface to >> pass >> information and objects through the Calcite stack. >> 4. Is an alternative where the complexity of the changelog optimization >> can be moved into the `FlinkChangelogModeInferenceProgram`? (If this >> is >> coupling between the mini-batch and changelog rules, then this would >> not >> make sense.) >> 5. There are some other smaller refactorings. I tried some of them >> here: https://github.com/apache/flink/pull/24108 Mostly, it is syntax >> and using lazy vals to avoid recomputing various things. (Feel free to >> take whatever actually works; I haven't run the tests.) >> >> Separately, folks on the Calcite dev list are thinking about multi-query >> optimization: >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k >> https://issues.apache.org/jira/browse/CALCITE-6188 >> >> Cheers, >> >> >> Jim >> >> On Tue, Jan 16, 2024 at 5:45 PM Jeyhun Karimov <je.kari...@gmail.com> >> wrote: >> >> > Hi devs, >> > >> > I’d like to start a discussion on FLIP-419: Optimize multi-sink query >> plan >> > generation [1]. >> > >> > >> > Currently, the optimization process of multi-sink query plans are >> > suboptimal: 1) it requires to go through the optimization process >> several >> > times and 2) as a result of this some low-level code complexity is >> > introduced on high level optimization classes such >> > as StreamCommonSubGraphBasedOptimizer. >> > >> > >> > To address this issue, this FLIP introduces to decouple changelog and >> > mini-batch interval inference from the main optimization process. >> > >> > Please find more details in the FLIP wiki document [1]. Looking forward >> to >> > your feedback. >> > >> > [1] >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-419%3A+Optimize+multi-sink+query+plan+generation >> > >> > >> > Regards, >> > Jeyhun Karimov >> > >> >