Hi Zhanghao, Thanks for the update; +1 for the proposal!
Best, Jane On Fri, Sep 22, 2023 at 2:13 PM Chen Zhanghao <zhanghao.c...@outlook.com> wrote: > Hi Jane, > > Thanks for the suggestions and totally agree with them. I've updated the > FLIP with the following two changes: > > 1. Rename WrapperTransformation to SourceTransformationWrapper that wraps > a SourceTransformation only. Note that we do not plan to support the legacy > LegacySourceTransformation. > 2. Choosing the partitioner after the source will be based on the > changelog mode of the source + the existence of the primary key in source > schema. If the source will produce update/delete message but a primary key > does not exist, an exception will be thrown. > > Best, > Zhanghao Chen > ________________________________ > 发件人: Jane Chan <qingyue....@gmail.com> > 发送时间: 2023年9月20日 15:13 > 收件人: dev@flink.apache.org <dev@flink.apache.org> > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL > Sources > > Hi Zhanghao, > > Thanks for the update. The FLIP now looks good to me in general, and I have > two minor comments. > > 1. Compared with other subclasses like `CacheTransformation` or > `PartitionTransformation`, the name `WrapperTransformation` seems too > general. What about `SourceTransformationWrapper`, which is more specific > and descriptive, WDYT? > > 2. > > > When the source generates update and delete data (determined by checking > > the existence of a primary key in the source schema), the source will use > > hash partitioner to send data. > > > It might not be sufficient to determine whether the source is a CDC source > solely based on checking the existence of the primary key. It's better to > check the changelog mode of the source. On the other hand, adding the hash > partitioner requires the CDC source table to declare the primary key in the > DDL. Therefore, it is preferable to explain this restriction in the FLIP > and doc and throw a meaningful exception when users want to configure a > different parallelism for a CDC source but forget to declare the primary > key constraint. > > Best, > Jane > > On Wed, Sep 20, 2023 at 9:20 AM Benchao Li <libenc...@apache.org> wrote: > > > Thank you for the update, the FLIP now looks good to me. > > > > Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月19日周二 22:50写道: > > > > > > Thanks to everyone for the valuable inputs, we learnt a lot during the > > discussion. We've updated the FLIP in three main aspects based on the > > discussion here: > > > > > > - Add a new subsection on keeping downstream operators' parallelism > > unchanged by wrapping the source transformation in a phantom > transformation. > > > - Add a new subsection on how to deal with changelog messages, simply > > put, build a hash partitioner based on the primary key when a source > > generates update/delete data. > > > - Update the non-goals section to remove the possibly misleading > > statement that setting parallelism for individual operators lacks public > > interest and state that we leave it for future work due to its extra > > complexity. > > > > > > Looking forward to your suggestions. > > > > > > Best, > > > Zhanghao Chen > > > ________________________________ > > > 发件人: Feng Jin <jinfeng1...@gmail.com> > > > 发送时间: 2023年9月17日 0:56 > > > 收件人: dev@flink.apache.org <dev@flink.apache.org> > > > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL > > Sources > > > > > > Hi, Zhanghao > > > > > > Thank you for proposing this FLIP, it is a very meaningful feature. > > > > > > I agree that currently we may only consider the parallelism setting of > > the > > > source itself. If we consider the parallelism setting of other > operators, > > > it may make the entire design more complex. > > > > > > Regarding the situation where the parallelism of the source is > different > > > from that of downstream tasks, I did not find a more detailed > description > > > in FLIP. > > > > > > By default, if the parallelism between two operators is different, the > > > rebalance partitioner will be used. > > > But in the SQL scenario, I believe that we should keep the behavior of > > > parallelism setting consistent with that of the sink. > > > > > > 1. When the source only generates insert-only data, if there is a > > mismatch > > > in parallelism between the source and downstream operators, rebalance > is > > > used by default. > > > > > > 2. When the source generates update and delete data, we should require > > the > > > source to configure a primary key and then build a hash partitioner > based > > > on that primary key. > > > > > > WDYT ? > > > > > > > > > Best, > > > Feng > > > > > > > > > On Sat, Sep 16, 2023 at 5:58 PM Jane Chan <qingyue....@gmail.com> > wrote: > > > > > > > Hi Zhanghao, > > > > > > > > Thanks for the explanation. > > > > > > > > For Q1, I think the key lies in determining the boundary where the > > chain > > > > should be broken. However, this boundary is ultimately determined by > > the > > > > specific requirements of each user query. > > > > > > > > The most straightforward approach is breaking the chain after the > > source > > > > operator, even though it involves a tradeoff. This is because there > > may be > > > > instances of `StreamExecWatermarkAssigner`, > > `StreamExecMiniBatchAssigner`, > > > > or `StreamExecChangelogNormalize` occurring before the > `StreamExecCalc` > > > > node, and it would be complex and challenging to enumerate all > possible > > > > match patterns. > > > > > > > > A more complex workaround would be to provide an entry point for > users > > to > > > > configure the specific operator that should serve as the breakpoint. > > > > Meanwhile, this would further increase the complexity of this FLIP. > > > > > > > > However, if the parallelism of each operator can be configured (in > the > > > > future), then this problem would not exist (it might be beyond the > > scope of > > > > discussion for this FLIP). > > > > > > > > I personally lean towards keeping the FLIP concise and focused by > > choosing > > > > the most straightforward approach. I would also like to hear other's > > > > opinions. > > > > > > > > Best, > > > > Jane > > > > > > > > On Sat, Sep 16, 2023 at 10:21 AM Yun Tang <myas...@live.com> wrote: > > > > > > > > > Hi Zhanghao, > > > > > > > > > > Certainly, I think we shall leave this FLIP focus on setting the > > source > > > > > parallelism via DDL's properties. I just want to clarify that > setting > > > > > parallelism for individual operators is also profitable from my > > > > experience, > > > > > which is slighted in your FLIP. > > > > > > > > > > @ConradJam BTW, compared with SQL hint, I think using > > `scan.parallelism` > > > > > is better to align with current `sink.parallelism`. And once we > > introduce > > > > > such option, we can also use SQL hint of dynamic table options[1] > to > > > > > configure the source parallelism. > > > > > > > > > > [1] > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options > > > > > > > > > > > > > > > Best > > > > > Yun Tang > > > > > ________________________________ > > > > > From: ConradJam <jam.gz...@gmail.com> > > > > > Sent: Friday, September 15, 2023 22:52 > > > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > > > Subject: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for > > > > Table/SQL > > > > > Sources > > > > > > > > > > + 1 Thanks for the FLIP and the discussion. I would like to ask > > whether > > > > to > > > > > use SQL Hint syntax to set this parallelism? > > > > > > > > > > Martijn Visser <martijnvis...@apache.org> 于2023年9月15日周五 20:52写道: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > Thanks for the FLIP and the discussion. I find it exciting. > Thanks > > for > > > > > > pushing for this. > > > > > > > > > > > > Best regards, > > > > > > > > > > > > Martijn > > > > > > > > > > > > On Fri, Sep 15, 2023 at 2:25 PM Chen Zhanghao < > > > > zhanghao.c...@outlook.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Jane, > > > > > > > > > > > > > > Thanks for the valuable suggestions. > > > > > > > > > > > > > > For Q1, it's indeed an issue. Some possible ideas include > > > > introducing a > > > > > > > fake transformation after the source that takes the global > > default > > > > > > > parallelism, or simply make exec nodes to take the global > default > > > > > > > parallelism, but both ways prevent potential chaining > > opportunity and > > > > > I'm > > > > > > > not sure if that's good to go. We'll need to give deeper > > thoughts in > > > > it > > > > > > and > > > > > > > polish our proposal. We're also more than glad to hear your > > inputs on > > > > > it. > > > > > > > > > > > > > > For Q2, scan.parallelism will take high precedence, as the more > > > > > specific > > > > > > > config should take higher precedence. > > > > > > > > > > > > > > Best, > > > > > > > Zhanghao Chen > > > > > > > ________________________________ > > > > > > > 发件人: Jane Chan <qingyue....@gmail.com> > > > > > > > 发送时间: 2023年9月15日 11:56 > > > > > > > 收件人: dev@flink.apache.org <dev@flink.apache.org> > > > > > > > 抄送: dewe...@outlook.com <dewe...@outlook.com> > > > > > > > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for > > Table/SQL > > > > > > > Sources > > > > > > > > > > > > > > Hi, Zhanghao, Dewei, > > > > > > > > > > > > > > Thanks for initiating this discussion. This feature is valuable > > in > > > > > > > providing more flexibility for performance tuning for SQL > > pipelines. > > > > > > > > > > > > > > Here are my two cents, > > > > > > > > > > > > > > 1. In the FLIP, you mentioned concerns about the parallelism of > > the > > > > > calc > > > > > > > node and concluded to "leave the behavior unchanged for now." > > This > > > > > means > > > > > > > that the calc node will use the parallelism of the source > > operator, > > > > > > > regardless of whether the source parallelism is configured or > > not. > > > > If I > > > > > > > understand correctly, currently, except for the sink exec node > > (which > > > > > has > > > > > > > the ability to configure its own parallelism), the rest of the > > exec > > > > > nodes > > > > > > > accept its input parallelism. From the design, I didn't see the > > > > details > > > > > > > about coping with input and default parallelism for the rest of > > the > > > > > exec > > > > > > > nodes. Can you elaborate more about the details? > > > > > > > > > > > > > > 2. Does the configuration > > `table.exec.resource.default-parallelism` > > > > > take > > > > > > > precedence over `scan.parallelism`? > > > > > > > > > > > > > > Best, > > > > > > > Jane > > > > > > > > > > > > > > On Fri, Sep 15, 2023 at 10:43 AM Yun Tang <myas...@live.com> > > wrote: > > > > > > > > > > > > > > > Thanks for creating this FLIP, > > > > > > > > > > > > > > > > Many users have demands to configure the source parallelism > > just as > > > > > > > > configuring the sink parallelism via DDL. Look forward for > this > > > > > > feature. > > > > > > > > > > > > > > > > BTW, I think setting parallelism for each operator should > also > > be > > > > > > > > valuable. And this shall work with compiled plan [1] instead > of > > > > SQL's > > > > > > > DDL. > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-292%3A+Enhance+COMPILED+PLAN+to+support+operator-level+state+TTL+configuration > > > > > > > > > > > > > > > > Best > > > > > > > > Yun Tang > > > > > > > > ________________________________ > > > > > > > > From: Benchao Li <libenc...@apache.org> > > > > > > > > Sent: Thursday, September 14, 2023 19:53 > > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > > > > > > Cc: dewe...@outlook.com <dewe...@outlook.com> > > > > > > > > Subject: Re: [DISCUSS] FLIP-367: Support Setting Parallelism > > for > > > > > > > Table/SQL > > > > > > > > Sources > > > > > > > > > > > > > > > > Thanks Zhanghao, Dewei for preparing the FLIP, > > > > > > > > > > > > > > > > I think this is a long awaited feature, and I appreciate your > > > > effort, > > > > > > > > especially the "Other concerns" part you listed. > > > > > > > > > > > > > > > > Regarding the parallelism of transformations following the > > source > > > > > > > > transformation, it's indeed a problem that we initially want > to > > > > solve > > > > > > > > when we introduced this feature internally. I'd like to hear > > more > > > > > > > > opinions on this. Personally I'm ok to leave it out of this > > FLIP > > > > for > > > > > > > > the time being. > > > > > > > > > > > > > > > > Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月14日周四 > > 14:46写道: > > > > > > > > > > > > > > > > > > Hi Devs, > > > > > > > > > > > > > > > > > > Dewei (cced) and I would like to start a discussion on > > FLIP-367: > > > > > > > Support > > > > > > > > Setting Parallelism for Table/SQL Sources [1]. > > > > > > > > > > > > > > > > > > Currently, Flink Table/SQL jobs do not expose fine-grained > > > > control > > > > > of > > > > > > > > operator parallelism to users. FLIP-146 [2] brings us support > > for > > > > > > setting > > > > > > > > parallelism for sinks, but except for that, one can only set > a > > > > > default > > > > > > > > global parallelism and all other operators share the same > > > > > parallelism. > > > > > > > > However, in many cases, setting parallelism for sources > > > > individually > > > > > is > > > > > > > > preferable: > > > > > > > > > > > > > > > > > > - Many connectors have an upper bound parallelism to > > efficiently > > > > > > ingest > > > > > > > > data. For example, the parallelism of a Kafka source is bound > > by > > > > the > > > > > > > number > > > > > > > > of partitions, any extra tasks would be idle. > > > > > > > > > - Other operators may involve intensive computation and > need > > a > > > > > larger > > > > > > > > parallelism. > > > > > > > > > > > > > > > > > > We propose to improve the current situation by extending > the > > > > > current > > > > > > > > table source API to support setting parallelism for Table/SQL > > > > sources > > > > > > via > > > > > > > > connector options. > > > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > > > > > [1] FLIP-367: Support Setting Parallelism for Table/SQL > > Sources - > > > > > > > Apache > > > > > > > > Flink - Apache Software Foundation< > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150 > > > > > > > > > > > > > > > > > > [2] FLIP-146: Improve new TableSource and TableSink > > interfaces - > > > > > > Apache > > > > > > > > Flink - Apache Software Foundation< > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Zhanghao Chen > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > Best, > > > > > > > > Benchao Li > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Best > > > > > > > > > > ConradJam > > > > > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > >