Thanks for driving this discussion. Do we really need to expose `PlanAnalyzerFactory` as public interface? I prefer we only expose ExplainDetail#ANALYZED_PHYSICAL_PLAN and the analyzed result. Which is enough for users and consistent with the results of `explain` method.
The classes about plan analyzer are in table planner module, which does not public api (public interfaces should be defined in flink-table-api-java module). And PlanAnalyzer is depend on RelNode, which is internal class of planner, and not expose to users. Bests, Godfrey Shengkai Fang <fskm...@gmail.com> 于2023年1月3日周二 13:43写道: > > Sorry for the missing answer about the configuration of the Analyzer. Users > may don't need to configure this with SQL statements. In the SQL Gateway, > users can configure the endpoints with the option `sql-gateway.endpoint.type` > in the flink-conf. > > Best, > Shengkai > > Shengkai Fang <fskm...@gmail.com> 于2023年1月3日周二 12:26写道: > > > Hi, Jane. > > > > Thanks for bringing this to the discussion. I have some questions about > > the FLIP: > > > > 1. `PlanAnalyzer#analyze` uses the FlinkRelNode as the input. Could you > > share some thoughts about the motivation? In my experience, users mainly > > care about 2 things when they develop their job: > > > > a. Why their SQL can not work? For example, their streaming SQL contains > > an OVER window but their ORDER key is not ROWTIME. In this case, we may > > don't have a physical node or logical node because, during the > > optimization, the planner has already thrown the exception. > > > > b. Many users care about whether their state is compatible after upgrading > > their Flink version. In this case, I think the old execplan and the SQL > > statement are the user's input. > > > > So, I think we should introduce methods like `PlanAnalyzer#analyze(String > > sql)` and `PlanAnalyzer#analyze(String sql, ExecnodeGraph)` here. > > > > 2. I am just curious how other people add the rules to the Advisor. When > > rules increases, all these rules should be added to the Flink codebase? > > 3. How do users configure another advisor? > > > > Best, > > Shengkai > > > > > > > > Jane Chan <qingyue....@gmail.com> 于2022年12月28日周三 12:30写道: > > > >> Hi @yuxia, Thank you for reviewing the FLIP and raising questions. > >> > >> 1: Is the PlanAnalyzerFactory also expected to be implemented by users > >> just > >> > like DynamicTableSourceFactory or other factories? If so, I notice that > >> in > >> > the code of PlanAnalyzerManager#registerAnalyzers, the code is as > >> follows: > >> > FactoryUtil.discoverFactory(classLoader, PlanAnalyzerFactory.class, > >> > StreamPlanAnalyzerFactory.STREAM_IDENTIFIER)); IIUC, it'll always find > >> the > >> > factory with the name StreamPlanAnalyzerFactory.STREAM_IDENTIFIER; Is > >> it a > >> > typo or by design ? > >> > >> > >> This is a really good open question. For the short answer, yes, it is by > >> design. I'll explain the consideration in more detail. > >> > >> The standard procedure to create a custom table source/sink is to > >> implement > >> the factory and the source/sink class. There is a strong 1v1 relationship > >> between the factory and the source/sink. > >> > >> SQL > >> > >> DynamicTableSourceFactory > >> > >> Source > >> > >> create table … with (‘connector’ = ‘foo’) > >> > >> #factoryIdentifer.equals(“foo”) > >> > >> FooTableSource > >> > >> > >> *Apart from that, the custom function module is another kind of > >> implementation. The factory creates a collection of functions. This is a > >> 1vN relationship between the factory and the functions.* > >> > >> SQL > >> > >> ModuleFactory > >> > >> Function > >> > >> load module ‘bar’ > >> > >> #factoryIdentifier.equals(“bar”) > >> > >> A collection of functions > >> > >> Back to the plan analyzers, if we choose the first style, we also need to > >> expose a new SQL syntax to users, like "CREATE ANALYZER foo WITH ..." to > >> specify the factory identifier. But I think it is too heavy because an > >> analyzer is an auxiliary tool to help users write better queries, and thus > >> it should be exposed at the API level other than the user syntax level. > >> > >> As a result, I propose to follow the second style. Then we don't need to > >> introduce new syntax to create analyzers. Let StreamPlanAnalyzerFactory be > >> the default factory to create analyzers under the streaming mode, and the > >> custom analyzers will register themselves in StreamPlanAnalyzerFactory. > >> > >> @Override > >> public List<PlanAnalyzer> createAnalyzers() { > >> return Arrays.asList( > >> FooAnalyzer.INSTANCE, > >> BarAnalyzer.INSTANCE, > >> ...); > >> } > >> > >> > >> 2: Is there any special reason make PlanAdvice be a final class? Would it > >> > be better to make it an interface and we provide a default > >> implementation? > >> > My concern is some users may want have their own implementation for > >> > PlanAdvice. But it may be overthinking. If you think it won't bring any > >> > problem, I'm also fine with that. > >> > >> > >> The reason why making PlanAdvice final is that I think users would prefer > >> to implement the custom PlanAnalyzer than PlanAdvice. PlanAdvice is a POJO > >> class to represent the analyzed result provided by PlanAnalyzer. > >> > >> > >> 3: Is there a way only show advice? For me, it seems the advice will be > >> > more useful and the nodes may contains to many details. > >> > >> > >> The result contains two parts: the optimized physical plan itself + the > >> analysis of the plan. > >> > >> For PlanAdvice with the scope as GLOBAL, it is possible to do so. While > >> for > >> a LOCAL scope, the advice content is specific to certain nodes (E.g., some > >> certain rel nodes are sensitive to state TTL configuration). In this > >> situation, the plan cannot be omitted. On the other hand, the plan is > >> necessary from the visualization perspective. During the PoC phase, I made > >> some attempts to adapt the Flink Visualizer to illustrate the analyzed > >> plan, and it looks like the following pic. I think this is intuitive to > >> help users understand their queries and what they can do according to the > >> advice. > >> > >> > >> > >> 4: I'm curious about what't the global advice will look like. Is it > >> > possible to provide an example? > >> > >> > >> Here is an example to illustrate the non-deterministic update issue. > >> > >> create temporary table cdc_with_meta ( > >> a int, > >> b bigint, > >> c string, > >> d boolean, > >> metadata_1 int metadata, > >> metadata_2 string metadata, > >> metadata_3 bigint metadata, > >> primary key (a) not enforced > >> ) with ( > >> 'connector' = 'values', > >> 'changelog-mode' = 'I,UA,UB,D', > >> 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, > >> metadata_3:BIGINT' > >> ); > >> > >> create temporary table sink_without_pk ( > >> a int, > >> b bigint, > >> c string > >> ) with ( > >> 'connector' = 'values', > >> 'sink-insert-only' = 'false' > >> ); > >> > >> insert into sink_without_pk > >> select a, metadata_3, c > >> from cdc_with_meta; > >> > >> And with compilation as SCHEMA, the result is as below. > >> > >> { > >> "nodes" : [ { > >> "id" : 1, > >> "type" : "StreamPhysicalTableSourceScan", > >> "digest" : "TableSourceScan(table=[[default_catalog, default_database, > >> cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, > >> metadata_3], upsertKeys=[[a]])", > >> "changelog_mode" : "I,UB,UA,D" > >> }, { > >> "id" : 2, > >> "type" : "StreamPhysicalCalc", > >> "digest" : "Calc(select=[a, metadata_3, c], upsertKeys=[[a]])", > >> "changelog_mode" : "I,UB,UA,D", > >> "predecessors" : [ { > >> "id" : 1, > >> "distribution" : "ANY", > >> "changelog_mode" : "I,UB,UA,D" > >> } ] > >> }, { > >> "id" : 3, > >> "type" : "StreamPhysicalSink", > >> "digest" : > >> "Sink(table=[default_catalog.default_database.sink_without_pk], > >> fields=[a, metadata_3, c])", > >> "changelog_mode" : "NONE", > >> "predecessors" : [ { > >> "id" : 2, > >> "distribution" : "ANY", > >> "changelog_mode" : "I,UB,UA,D" > >> } ] > >> } ], > >> "advice" : [ { > >> "kind" : "WARNING", > >> "scope" : "GLOBAL", > >> "content" : "The metadata column(s): 'metadata_3' in cdc source may > >> cause wrong result or error on downstream operators, please consider > >> removing these columns or use a non-cdc source that only has insert > >> messages.\nsource node:\nTableSourceScan(table=[[default_catalog, > >> default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], > >> fields=[a, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])\n" > >> } ] > >> } > >> > >> > >> Best regards, > >> Jane Chan > >> > >> On Tue, Dec 27, 2022 at 8:06 PM yuxia <luoyu...@alumni.sjtu.edu.cn> > >> wrote: > >> > >> > Thanks for driving this FLIP. It should be a good improvement to users. > >> > But I have few questions: > >> > 1: Is the PlanAnalyzerFactory also expected to be implemented by users > >> > just like DynamicTableSourceFactory or other factories? If so, I notice > >> > that in the code of PlanAnalyzerManager#registerAnalyzers, the code is > >> as > >> > follows: > >> > FactoryUtil.discoverFactory(classLoader, PlanAnalyzerFactory.class, > >> > StreamPlanAnalyzerFactory.STREAM_IDENTIFIER)); > >> > > >> > IIUC, it'll always find the factory with the name > >> > StreamPlanAnalyzerFactory.STREAM_IDENTIFIER; Is it a typo or by design ? > >> > > >> > 2: Is there any special reason make PlanAdvice be a final class? Would > >> it > >> > be better to make it an interface and we provide a default > >> implementation? > >> > My concern is some users may want have their own implementation for > >> > PlanAdvice. But it may be overthinking. If you think it won't bring any > >> > problem, I'm also fine with that. > >> > > >> > 3: Is there a way only show advice? For me, it seems the advice will be > >> > more useful and the nodes may contains to many details. > >> > > >> > 4: I'm curious about what't the global advice will look like. Is it > >> > possible to provide an example? > >> > > >> > >> > >> > >> > > >> > Best regards, > >> > Yuxia > >> > > >> > ----- 原始邮件 ----- > >> > 发件人: "Jane Chan" <qingyue....@gmail.com> > >> > 收件人: "dev" <dev@flink.apache.org> > >> > 发送时间: 星期一, 2022年 12 月 26日 下午 9:39:18 > >> > 主题: [DISCUSS] FLIP-280: Introduce a new explain mode to provide SQL > >> advice > >> > > >> > Hi, devs, > >> > > >> > I would like to start a discussion on FLIP-280: Introduce a new explain > >> > mode to provide SQL advice[1]. > >> > > >> > Currently, Flink SQL EXPLAIN statement provides three details to display > >> > the plan. However, there is a considerable gap between the current > >> > explained result and the actual, applicable actions for users to improve > >> > their queries. > >> > > >> > To provide more understandable, actionable advice closer to the user's > >> > perspective, we propose a new explain mode that analyzes the physical > >> plan > >> > and attaches available tuning advice and data correctness warnings. > >> > > >> > EXPLAIN ANALYZED_PHYSICAL_PLAN <query> > >> > > >> > I've included more details in [1], and I look forward to your feedback. > >> > > >> > [1] > >> > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-280%3A+Support+EXPLAIN+SQL+statements+with+advice > >> > [2] POC: https://github.com/LadyForest/flink/tree/FLIP-280 > >> > > >> > Best regards, > >> > Jane Chan > >> > > >> > >