Hi Jeyhun Karimov, I see that you've already opened up a VOTE thread, but since you're talking about having a prototype already and results, I wondered if you could include the POC and how you've tested these results in the FLIP?
Best regards, Martijn On Tue, Jan 30, 2024 at 4:47 AM Jeyhun Karimov <je.kari...@gmail.com> wrote: > Hi devs, > > I just wanted to give an update on this FLIP. > I updated the doc based on the comments from Jim. > Also, I developed a prototype and did some testing. > > I in my small prototype I ran the following tests: > > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1 > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2 > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3 > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4 > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5 > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1 > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2 > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3 > - > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4 > > > These tests are e2e dag optimization, including query parsing, validation, > optimization, and checking the results. > > In these e2e optimization tests, my prototype was 15-20% faster than > existing Flink optimization structure (with the "cost" of simplifying the > codebase). > > > Any questions/comments are more than welcome. > > > Regards, > > Jeyhun Karimov > > On Wed, Jan 17, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com> > wrote: > > > Hi Jim, > > > > Thanks for your comments. Please find my answers below: > > > > 1. StreamOptimizeContext may still be needed to pass the fact that we > >> are optimizing a streaming query. I don't think this class will go > >> away > >> completely. (I agree it may become more simple if the kind or > >> mini-batch configuration can be removed.) > > > > > > What I meant is that it might go away if we get rid of > > *isUpdateBeforeRequired* and *getMiniBatchInterval *fields. > > Of course if we can get rid of only one of them, then the > > *StreamOptimizeContext* class will not be removed but get simpler. > > Will update the doc accordingly. > > > > 2. How are the mini-batch and changelog inference rules tightly > coupled? > >> I looked a little bit and I haven't seen any connection between them. > >> It > >> seems like the changelog inference is what needs to run multiple > times. > > > > > > Sorry for the misunderstanding. The mini-batch and changelog inference > are > > not coupled among themselves but with the high-level optimization logic. > > The idea is to separate the query optimization into 1) optimize 2) enrich > > with changelog inference 3) enrich with mini-batch interval inference and > > 4) rewrite > > > > 3. I think your point about code complexity is unnecessary. > >> StreamOptimizeContext > >> extends org.apache.calcite.plan.Context which is used an interface to > >> pass > >> information and objects through the Calcite stack. > > > > > > I partially agree. Please see my answer above for the question 1. > > > > 4. Is an alternative where the complexity of the changelog > optimization > >> can be moved into the `FlinkChangelogModeInferenceProgram`? (If this > >> is > >> coupling between the mini-batch and changelog rules, then this would > >> not > >> make sense.) > > > > > > Good point. Yes, this is definitely an alternative. > > > > 5. There are some other smaller refactorings. I tried some of them > >> here: https://github.com/apache/flink/pull/24108 Mostly, it is > syntax > >> and using lazy vals to avoid recomputing various things. (Feel free > to > >> take whatever actually works; I haven't run the tests.) > > > > > > I took a look at your PR. For sure, some of the refactorings I will reuse > > (probably rebase by the time I have this ready :)) > > > > > > Separately, folks on the Calcite dev list are thinking about multi-query > >> optimization: > >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k > >> https://issues.apache.org/jira/browse/CALCITE-6188 > > > > > > Seems interesting. But Calcite's MQO approach will probably require some > > drastic changes in our codebase once we adopt it. > > This approach is more incremental. > > > > Hope my comments answer your questions. > > > > Regards, > > Jeyhun Karimov > > > > On Wed, Jan 17, 2024 at 2:36 AM Jim Hughes <jhug...@confluent.io.invalid > > > > wrote: > > > >> Hi Jeyhun, > >> > >> > >> Generally, I like the idea of speeding up the optimizer in the case of > >> multiple queries! > >> > >> > >> I am new to the optimizer, but I have a few comments / questions. > >> > >> > >> > >> 1. StreamOptimizeContext may still be needed to pass the fact that we > >> are optimizing a streaming query. I don't think this class will go > >> away > >> completely. (I agree it may become more simple if the kind or > >> mini-batch configuration can be removed.) > >> 2. How are the mini-batch and changelog inference rules tightly > >> coupled? > >> I looked a little bit and I haven't seen any connection between them. > >> It > >> seems like the changelog inference is what needs to run multiple > times. > >> 3. I think your point about code complexity is unnecessary. > >> StreamOptimizeContext > >> extends org.apache.calcite.plan.Context which is used an interface to > >> pass > >> information and objects through the Calcite stack. > >> 4. Is an alternative where the complexity of the changelog > optimization > >> can be moved into the `FlinkChangelogModeInferenceProgram`? (If this > >> is > >> coupling between the mini-batch and changelog rules, then this would > >> not > >> make sense.) > >> 5. There are some other smaller refactorings. I tried some of them > >> here: https://github.com/apache/flink/pull/24108 Mostly, it is > syntax > >> and using lazy vals to avoid recomputing various things. (Feel free > to > >> take whatever actually works; I haven't run the tests.) > >> > >> Separately, folks on the Calcite dev list are thinking about multi-query > >> optimization: > >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k > >> https://issues.apache.org/jira/browse/CALCITE-6188 > >> > >> Cheers, > >> > >> > >> Jim > >> > >> On Tue, Jan 16, 2024 at 5:45 PM Jeyhun Karimov <je.kari...@gmail.com> > >> wrote: > >> > >> > Hi devs, > >> > > >> > I’d like to start a discussion on FLIP-419: Optimize multi-sink query > >> plan > >> > generation [1]. > >> > > >> > > >> > Currently, the optimization process of multi-sink query plans are > >> > suboptimal: 1) it requires to go through the optimization process > >> several > >> > times and 2) as a result of this some low-level code complexity is > >> > introduced on high level optimization classes such > >> > as StreamCommonSubGraphBasedOptimizer. > >> > > >> > > >> > To address this issue, this FLIP introduces to decouple changelog and > >> > mini-batch interval inference from the main optimization process. > >> > > >> > Please find more details in the FLIP wiki document [1]. Looking > forward > >> to > >> > your feedback. > >> > > >> > [1] > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-419%3A+Optimize+multi-sink+query+plan+generation > >> > > >> > > >> > Regards, > >> > Jeyhun Karimov > >> > > >> > > >