Thank you for the update, the FLIP now looks good to me. Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月19日周二 22:50写道: > > Thanks to everyone for the valuable inputs, we learnt a lot during the > discussion. We've updated the FLIP in three main aspects based on the > discussion here: > > - Add a new subsection on keeping downstream operators' parallelism unchanged > by wrapping the source transformation in a phantom transformation. > - Add a new subsection on how to deal with changelog messages, simply put, > build a hash partitioner based on the primary key when a source generates > update/delete data. > - Update the non-goals section to remove the possibly misleading statement > that setting parallelism for individual operators lacks public interest and > state that we leave it for future work due to its extra complexity. > > Looking forward to your suggestions. > > Best, > Zhanghao Chen > ________________________________ > 发件人: Feng Jin <jinfeng1...@gmail.com> > 发送时间: 2023年9月17日 0:56 > 收件人: dev@flink.apache.org <dev@flink.apache.org> > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL Sources > > Hi, Zhanghao > > Thank you for proposing this FLIP, it is a very meaningful feature. > > I agree that currently we may only consider the parallelism setting of the > source itself. If we consider the parallelism setting of other operators, > it may make the entire design more complex. > > Regarding the situation where the parallelism of the source is different > from that of downstream tasks, I did not find a more detailed description > in FLIP. > > By default, if the parallelism between two operators is different, the > rebalance partitioner will be used. > But in the SQL scenario, I believe that we should keep the behavior of > parallelism setting consistent with that of the sink. > > 1. When the source only generates insert-only data, if there is a mismatch > in parallelism between the source and downstream operators, rebalance is > used by default. > > 2. When the source generates update and delete data, we should require the > source to configure a primary key and then build a hash partitioner based > on that primary key. > > WDYT ? > > > Best, > Feng > > > On Sat, Sep 16, 2023 at 5:58 PM Jane Chan <qingyue....@gmail.com> wrote: > > > Hi Zhanghao, > > > > Thanks for the explanation. > > > > For Q1, I think the key lies in determining the boundary where the chain > > should be broken. However, this boundary is ultimately determined by the > > specific requirements of each user query. > > > > The most straightforward approach is breaking the chain after the source > > operator, even though it involves a tradeoff. This is because there may be > > instances of `StreamExecWatermarkAssigner`, `StreamExecMiniBatchAssigner`, > > or `StreamExecChangelogNormalize` occurring before the `StreamExecCalc` > > node, and it would be complex and challenging to enumerate all possible > > match patterns. > > > > A more complex workaround would be to provide an entry point for users to > > configure the specific operator that should serve as the breakpoint. > > Meanwhile, this would further increase the complexity of this FLIP. > > > > However, if the parallelism of each operator can be configured (in the > > future), then this problem would not exist (it might be beyond the scope of > > discussion for this FLIP). > > > > I personally lean towards keeping the FLIP concise and focused by choosing > > the most straightforward approach. I would also like to hear other's > > opinions. > > > > Best, > > Jane > > > > On Sat, Sep 16, 2023 at 10:21 AM Yun Tang <myas...@live.com> wrote: > > > > > Hi Zhanghao, > > > > > > Certainly, I think we shall leave this FLIP focus on setting the source > > > parallelism via DDL's properties. I just want to clarify that setting > > > parallelism for individual operators is also profitable from my > > experience, > > > which is slighted in your FLIP. > > > > > > @ConradJam BTW, compared with SQL hint, I think using `scan.parallelism` > > > is better to align with current `sink.parallelism`. And once we introduce > > > such option, we can also use SQL hint of dynamic table options[1] to > > > configure the source parallelism. > > > > > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options > > > > > > > > > Best > > > Yun Tang > > > ________________________________ > > > From: ConradJam <jam.gz...@gmail.com> > > > Sent: Friday, September 15, 2023 22:52 > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > Subject: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for > > Table/SQL > > > Sources > > > > > > + 1 Thanks for the FLIP and the discussion. I would like to ask whether > > to > > > use SQL Hint syntax to set this parallelism? > > > > > > Martijn Visser <martijnvis...@apache.org> 于2023年9月15日周五 20:52写道: > > > > > > > Hi everyone, > > > > > > > > Thanks for the FLIP and the discussion. I find it exciting. Thanks for > > > > pushing for this. > > > > > > > > Best regards, > > > > > > > > Martijn > > > > > > > > On Fri, Sep 15, 2023 at 2:25 PM Chen Zhanghao < > > zhanghao.c...@outlook.com > > > > > > > > wrote: > > > > > > > > > Hi Jane, > > > > > > > > > > Thanks for the valuable suggestions. > > > > > > > > > > For Q1, it's indeed an issue. Some possible ideas include > > introducing a > > > > > fake transformation after the source that takes the global default > > > > > parallelism, or simply make exec nodes to take the global default > > > > > parallelism, but both ways prevent potential chaining opportunity and > > > I'm > > > > > not sure if that's good to go. We'll need to give deeper thoughts in > > it > > > > and > > > > > polish our proposal. We're also more than glad to hear your inputs on > > > it. > > > > > > > > > > For Q2, scan.parallelism will take high precedence, as the more > > > specific > > > > > config should take higher precedence. > > > > > > > > > > Best, > > > > > Zhanghao Chen > > > > > ________________________________ > > > > > 发件人: Jane Chan <qingyue....@gmail.com> > > > > > 发送时间: 2023年9月15日 11:56 > > > > > 收件人: dev@flink.apache.org <dev@flink.apache.org> > > > > > 抄送: dewe...@outlook.com <dewe...@outlook.com> > > > > > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL > > > > > Sources > > > > > > > > > > Hi, Zhanghao, Dewei, > > > > > > > > > > Thanks for initiating this discussion. This feature is valuable in > > > > > providing more flexibility for performance tuning for SQL pipelines. > > > > > > > > > > Here are my two cents, > > > > > > > > > > 1. In the FLIP, you mentioned concerns about the parallelism of the > > > calc > > > > > node and concluded to "leave the behavior unchanged for now." This > > > means > > > > > that the calc node will use the parallelism of the source operator, > > > > > regardless of whether the source parallelism is configured or not. > > If I > > > > > understand correctly, currently, except for the sink exec node (which > > > has > > > > > the ability to configure its own parallelism), the rest of the exec > > > nodes > > > > > accept its input parallelism. From the design, I didn't see the > > details > > > > > about coping with input and default parallelism for the rest of the > > > exec > > > > > nodes. Can you elaborate more about the details? > > > > > > > > > > 2. Does the configuration `table.exec.resource.default-parallelism` > > > take > > > > > precedence over `scan.parallelism`? > > > > > > > > > > Best, > > > > > Jane > > > > > > > > > > On Fri, Sep 15, 2023 at 10:43 AM Yun Tang <myas...@live.com> wrote: > > > > > > > > > > > Thanks for creating this FLIP, > > > > > > > > > > > > Many users have demands to configure the source parallelism just as > > > > > > configuring the sink parallelism via DDL. Look forward for this > > > > feature. > > > > > > > > > > > > BTW, I think setting parallelism for each operator should also be > > > > > > valuable. And this shall work with compiled plan [1] instead of > > SQL's > > > > > DDL. > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-292%3A+Enhance+COMPILED+PLAN+to+support+operator-level+state+TTL+configuration > > > > > > > > > > > > Best > > > > > > Yun Tang > > > > > > ________________________________ > > > > > > From: Benchao Li <libenc...@apache.org> > > > > > > Sent: Thursday, September 14, 2023 19:53 > > > > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > > > > Cc: dewe...@outlook.com <dewe...@outlook.com> > > > > > > Subject: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for > > > > > Table/SQL > > > > > > Sources > > > > > > > > > > > > Thanks Zhanghao, Dewei for preparing the FLIP, > > > > > > > > > > > > I think this is a long awaited feature, and I appreciate your > > effort, > > > > > > especially the "Other concerns" part you listed. > > > > > > > > > > > > Regarding the parallelism of transformations following the source > > > > > > transformation, it's indeed a problem that we initially want to > > solve > > > > > > when we introduced this feature internally. I'd like to hear more > > > > > > opinions on this. Personally I'm ok to leave it out of this FLIP > > for > > > > > > the time being. > > > > > > > > > > > > Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月14日周四 14:46写道: > > > > > > > > > > > > > > Hi Devs, > > > > > > > > > > > > > > Dewei (cced) and I would like to start a discussion on FLIP-367: > > > > > Support > > > > > > Setting Parallelism for Table/SQL Sources [1]. > > > > > > > > > > > > > > Currently, Flink Table/SQL jobs do not expose fine-grained > > control > > > of > > > > > > operator parallelism to users. FLIP-146 [2] brings us support for > > > > setting > > > > > > parallelism for sinks, but except for that, one can only set a > > > default > > > > > > global parallelism and all other operators share the same > > > parallelism. > > > > > > However, in many cases, setting parallelism for sources > > individually > > > is > > > > > > preferable: > > > > > > > > > > > > > > - Many connectors have an upper bound parallelism to efficiently > > > > ingest > > > > > > data. For example, the parallelism of a Kafka source is bound by > > the > > > > > number > > > > > > of partitions, any extra tasks would be idle. > > > > > > > - Other operators may involve intensive computation and need a > > > larger > > > > > > parallelism. > > > > > > > > > > > > > > We propose to improve the current situation by extending the > > > current > > > > > > table source API to support setting parallelism for Table/SQL > > sources > > > > via > > > > > > connector options. > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > [1] FLIP-367: Support Setting Parallelism for Table/SQL Sources - > > > > > Apache > > > > > > Flink - Apache Software Foundation< > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150 > > > > > > > > > > > > > > [2] FLIP-146: Improve new TableSource and TableSink interfaces - > > > > Apache > > > > > > Flink - Apache Software Foundation< > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Zhanghao Chen > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Best, > > > > > > Benchao Li > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Best > > > > > > ConradJam > > > > >
-- Best, Benchao Li