Hi Martijn, Thanks for your comment. I created a quick prototype to make sure that the proposed hypothesis works. So, it still is a quick implementation. On my local setup all tests pass, but I might need to spend some time (once VOTE thread passes probably) to clean it and submit a PR.
Each test was performed via mvn test (e.g., mvn test ... -Dtest=org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1 ) with master branch of Flink (dd16a4c07b2f8c96740fb522cb54cfd1d5a5e835) against my PoC over 5 runs. The runtime is derived from the test case's runtime reported by mvn test (and not the runtime of the whole mvn test command). I also included the above information in the FLIP. Please let me know if you have further questions. Regards, Jeyhun On Fri, Mar 8, 2024 at 12:01 PM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Jeyhun Karimov, > > I see that you've already opened up a VOTE thread, but since you're talking > about having a prototype already and results, I wondered if you could > include the POC and how you've tested these results in the FLIP? > > Best regards, > > Martijn > > On Tue, Jan 30, 2024 at 4:47 AM Jeyhun Karimov <je.kari...@gmail.com> > wrote: > > > Hi devs, > > > > I just wanted to give an update on this FLIP. > > I updated the doc based on the comments from Jim. > > Also, I developed a prototype and did some testing. > > > > I in my small prototype I ran the following tests: > > > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1 > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2 > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3 > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4 > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5 > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1 > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2 > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3 > > - > > > > > org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4 > > > > > > These tests are e2e dag optimization, including query parsing, > validation, > > optimization, and checking the results. > > > > In these e2e optimization tests, my prototype was 15-20% faster than > > existing Flink optimization structure (with the "cost" of simplifying the > > codebase). > > > > > > Any questions/comments are more than welcome. > > > > > > Regards, > > > > Jeyhun Karimov > > > > On Wed, Jan 17, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com> > > wrote: > > > > > Hi Jim, > > > > > > Thanks for your comments. Please find my answers below: > > > > > > 1. StreamOptimizeContext may still be needed to pass the fact that > we > > >> are optimizing a streaming query. I don't think this class will go > > >> away > > >> completely. (I agree it may become more simple if the kind or > > >> mini-batch configuration can be removed.) > > > > > > > > > What I meant is that it might go away if we get rid of > > > *isUpdateBeforeRequired* and *getMiniBatchInterval *fields. > > > Of course if we can get rid of only one of them, then the > > > *StreamOptimizeContext* class will not be removed but get simpler. > > > Will update the doc accordingly. > > > > > > 2. How are the mini-batch and changelog inference rules tightly > > coupled? > > >> I looked a little bit and I haven't seen any connection between > them. > > >> It > > >> seems like the changelog inference is what needs to run multiple > > times. > > > > > > > > > Sorry for the misunderstanding. The mini-batch and changelog inference > > are > > > not coupled among themselves but with the high-level optimization > logic. > > > The idea is to separate the query optimization into 1) optimize 2) > enrich > > > with changelog inference 3) enrich with mini-batch interval inference > and > > > 4) rewrite > > > > > > 3. I think your point about code complexity is unnecessary. > > >> StreamOptimizeContext > > >> extends org.apache.calcite.plan.Context which is used an interface > to > > >> pass > > >> information and objects through the Calcite stack. > > > > > > > > > I partially agree. Please see my answer above for the question 1. > > > > > > 4. Is an alternative where the complexity of the changelog > > optimization > > >> can be moved into the `FlinkChangelogModeInferenceProgram`? (If > this > > >> is > > >> coupling between the mini-batch and changelog rules, then this > would > > >> not > > >> make sense.) > > > > > > > > > Good point. Yes, this is definitely an alternative. > > > > > > 5. There are some other smaller refactorings. I tried some of them > > >> here: https://github.com/apache/flink/pull/24108 Mostly, it is > > syntax > > >> and using lazy vals to avoid recomputing various things. (Feel > free > > to > > >> take whatever actually works; I haven't run the tests.) > > > > > > > > > I took a look at your PR. For sure, some of the refactorings I will > reuse > > > (probably rebase by the time I have this ready :)) > > > > > > > > > Separately, folks on the Calcite dev list are thinking about > multi-query > > >> optimization: > > >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k > > >> https://issues.apache.org/jira/browse/CALCITE-6188 > > > > > > > > > Seems interesting. But Calcite's MQO approach will probably require > some > > > drastic changes in our codebase once we adopt it. > > > This approach is more incremental. > > > > > > Hope my comments answer your questions. > > > > > > Regards, > > > Jeyhun Karimov > > > > > > On Wed, Jan 17, 2024 at 2:36 AM Jim Hughes > <jhug...@confluent.io.invalid > > > > > > wrote: > > > > > >> Hi Jeyhun, > > >> > > >> > > >> Generally, I like the idea of speeding up the optimizer in the case of > > >> multiple queries! > > >> > > >> > > >> I am new to the optimizer, but I have a few comments / questions. > > >> > > >> > > >> > > >> 1. StreamOptimizeContext may still be needed to pass the fact that > we > > >> are optimizing a streaming query. I don't think this class will go > > >> away > > >> completely. (I agree it may become more simple if the kind or > > >> mini-batch configuration can be removed.) > > >> 2. How are the mini-batch and changelog inference rules tightly > > >> coupled? > > >> I looked a little bit and I haven't seen any connection between > them. > > >> It > > >> seems like the changelog inference is what needs to run multiple > > times. > > >> 3. I think your point about code complexity is unnecessary. > > >> StreamOptimizeContext > > >> extends org.apache.calcite.plan.Context which is used an interface > to > > >> pass > > >> information and objects through the Calcite stack. > > >> 4. Is an alternative where the complexity of the changelog > > optimization > > >> can be moved into the `FlinkChangelogModeInferenceProgram`? (If > this > > >> is > > >> coupling between the mini-batch and changelog rules, then this > would > > >> not > > >> make sense.) > > >> 5. There are some other smaller refactorings. I tried some of them > > >> here: https://github.com/apache/flink/pull/24108 Mostly, it is > > syntax > > >> and using lazy vals to avoid recomputing various things. (Feel > free > > to > > >> take whatever actually works; I haven't run the tests.) > > >> > > >> Separately, folks on the Calcite dev list are thinking about > multi-query > > >> optimization: > > >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k > > >> https://issues.apache.org/jira/browse/CALCITE-6188 > > >> > > >> Cheers, > > >> > > >> > > >> Jim > > >> > > >> On Tue, Jan 16, 2024 at 5:45 PM Jeyhun Karimov <je.kari...@gmail.com> > > >> wrote: > > >> > > >> > Hi devs, > > >> > > > >> > I’d like to start a discussion on FLIP-419: Optimize multi-sink > query > > >> plan > > >> > generation [1]. > > >> > > > >> > > > >> > Currently, the optimization process of multi-sink query plans are > > >> > suboptimal: 1) it requires to go through the optimization process > > >> several > > >> > times and 2) as a result of this some low-level code complexity is > > >> > introduced on high level optimization classes such > > >> > as StreamCommonSubGraphBasedOptimizer. > > >> > > > >> > > > >> > To address this issue, this FLIP introduces to decouple changelog > and > > >> > mini-batch interval inference from the main optimization process. > > >> > > > >> > Please find more details in the FLIP wiki document [1]. Looking > > forward > > >> to > > >> > your feedback. > > >> > > > >> > [1] > > >> > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-419%3A+Optimize+multi-sink+query+plan+generation > > >> > > > >> > > > >> > Regards, > > >> > Jeyhun Karimov > > >> > > > >> > > > > > >