Hi Zhanghao,

Thanks for the FLIP and discussion!  Hope this reply isn't too late : )
Firstly I'm fully agreed with the motivation of this FLIP and the value for
the users, but there are a few things we should consider(please correct me
if I'm misunderstanding):

*1.  *It seems that the current solution only takes care of part of the
requirement, the need to set source's parallelism may be different in
different jobs,  for example, consider the following two job topologies(one
{} simply represents a vertex):
a. {source -> calc -> sink}

b. {source -> calc} -> {aggregate} -> {sink}

For job a, if there is a bottleneck in calc operator, but source
parallelism cannot be scaled up (e.g., limited by kafka's partition
number), so the calc operator cannot be scaled up to achieve higher
throughput because the operators in source vertex are chained together,
then current solution is reasonable (break the chain, add a shuffle).

But for job b, if the bottleneck is the aggregate operator (not calc), it's
more likely be better to scale up the aggregate operator/vertex and without
breaking the {source -> calc} chain, as this will incur additional shuffle
cost.
So if we decide to add this new feature, I would recommend that both cases
be taken care of.


2. the assumption that a cdc source must have pk(primary key) may not be
reasonable, for example, mysql cdc supports the case without pk(
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#tables-without-primary-keys),
so we can not just raise an error here.


3. for the new SourceTransformationWrapper I have some concerns about the
future evolution, if we need to add support for other operators, do we
continue to add new xxWrappers?

I've also revisited the previous discussion on FLIP-146[1], there were no
clear conclusions or good ideas about similar support issues for the source
before, and I also noticed that the new capability to change per-vertex
parallelism via rest api in 1.18 (part of FLIP-291[2][3], but there is
actually an issue about sql job's parallelism change which may require a
hash shuffle to ensure the order of update stream, this needs to be
followed up in FLIP-291, a jira will be created later).  So perhaps, we
need to think about it more (the next version is not yet launched, so we
still have time)

[1] https://lists.apache.org/thread/gtpswl42jzv0c9o3clwqskpllnw8rh87
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
[3] https://issues.apache.org/jira/browse/FLINK-31471


Best,
Lincoln Lee


Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月22日周五 16:00写道:

> Thanks to everyone who participated in the discussion here. If no further
> questions/concerns are raised, we'll start voting next Monday afternoon
> (GMT+8).
>
> Best,
> Zhanghao Chen
> ________________________________
> 发件人: Jane Chan <qingyue....@gmail.com>
> 发送时间: 2023年9月22日 15:35
> 收件人: dev@flink.apache.org <dev@flink.apache.org>
> 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL
> Sources
>
> Hi Zhanghao,
>
> Thanks for the update; +1 for the proposal!
>
> Best,
> Jane
>
> On Fri, Sep 22, 2023 at 2:13 PM Chen Zhanghao <zhanghao.c...@outlook.com>
> wrote:
>
> > Hi Jane,
> >
> > Thanks for the suggestions and totally agree with them. I've updated the
> > FLIP with the following two changes:
> >
> > 1. Rename WrapperTransformation to SourceTransformationWrapper that wraps
> > a SourceTransformation only. Note that we do not plan to support the
> legacy
> > LegacySourceTransformation.
> > 2. Choosing the partitioner after the source will be based on the
> > changelog mode of the source + the existence of the primary key in source
> > schema. If the source will produce update/delete message but a primary
> key
> > does not exist, an exception will be thrown.
> >
> > Best,
> > Zhanghao Chen
> > ________________________________
> > 发件人: Jane Chan <qingyue....@gmail.com>
> > 发送时间: 2023年9月20日 15:13
> > 收件人: dev@flink.apache.org <dev@flink.apache.org>
> > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL
> > Sources
> >
> > Hi Zhanghao,
> >
> > Thanks for the update. The FLIP now looks good to me in general, and I
> have
> > two minor comments.
> >
> > 1. Compared with other subclasses like `CacheTransformation` or
> > `PartitionTransformation`, the name  `WrapperTransformation` seems too
> > general. What about `SourceTransformationWrapper`, which is more specific
> > and descriptive, WDYT?
> >
> > 2.
> >
> > > When the source generates update and delete data (determined by
> checking
> > > the existence of a primary key in the source schema), the source will
> use
> > > hash partitioner to send data.
> >
> >
> > It might not be sufficient to determine whether the source is a CDC
> source
> > solely based on checking the existence of the primary key. It's better to
> > check the changelog mode of the source. On the other hand, adding the
> hash
> > partitioner requires the CDC source table to declare the primary key in
> the
> > DDL. Therefore, it is preferable to explain this restriction in the FLIP
> > and doc and throw a meaningful exception when users want to configure a
> > different parallelism for a CDC source but forget to declare the primary
> > key constraint.
> >
> > Best,
> > Jane
> >
> > On Wed, Sep 20, 2023 at 9:20 AM Benchao Li <libenc...@apache.org> wrote:
> >
> > > Thank you for the update, the FLIP now looks good to me.
> > >
> > > Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月19日周二 22:50写道:
> > > >
> > > > Thanks to everyone for the valuable inputs, we learnt a lot during
> the
> > > discussion. We've updated the FLIP in three main aspects based on the
> > > discussion here:
> > > >
> > > > - Add a new subsection on keeping downstream operators' parallelism
> > > unchanged by wrapping the source transformation in a phantom
> > transformation.
> > > > - Add a new subsection on how to deal with changelog messages, simply
> > > put, build a hash partitioner based on the primary key when a source
> > > generates update/delete data.
> > > > - Update the non-goals section to remove the possibly misleading
> > > statement that setting parallelism for individual operators lacks
> public
> > > interest and state that we leave it for future work due to its extra
> > > complexity.
> > > >
> > > > Looking forward to your suggestions.
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > ________________________________
> > > > 发件人: Feng Jin <jinfeng1...@gmail.com>
> > > > 发送时间: 2023年9月17日 0:56
> > > > 收件人: dev@flink.apache.org <dev@flink.apache.org>
> > > > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL
> > > Sources
> > > >
> > > > Hi, Zhanghao
> > > >
> > > > Thank you for proposing this FLIP, it is a very meaningful feature.
> > > >
> > > > I agree that currently we may only consider the parallelism setting
> of
> > > the
> > > > source itself. If we consider the parallelism setting of other
> > operators,
> > > > it may make the entire design more complex.
> > > >
> > > > Regarding the situation where the parallelism of the source is
> > different
> > > > from that of downstream tasks, I did not find a more detailed
> > description
> > > > in FLIP.
> > > >
> > > > By default, if the parallelism between two operators is different,
> the
> > > > rebalance partitioner will be used.
> > > > But in the SQL scenario, I believe that we should keep the behavior
> of
> > > > parallelism setting consistent with that of the sink.
> > > >
> > > > 1. When the source only generates insert-only data, if there is a
> > > mismatch
> > > > in parallelism between the source and downstream operators, rebalance
> > is
> > > > used by default.
> > > >
> > > > 2. When the source generates update and delete data, we should
> require
> > > the
> > > > source to configure a primary key and then build a hash partitioner
> > based
> > > > on that primary key.
> > > >
> > > > WDYT ?
> > > >
> > > >
> > > > Best,
> > > > Feng
> > > >
> > > >
> > > > On Sat, Sep 16, 2023 at 5:58 PM Jane Chan <qingyue....@gmail.com>
> > wrote:
> > > >
> > > > > Hi Zhanghao,
> > > > >
> > > > > Thanks for the explanation.
> > > > >
> > > > > For Q1, I think the key lies in determining the boundary where the
> > > chain
> > > > > should be broken. However, this boundary is ultimately determined
> by
> > > the
> > > > > specific requirements of each user query.
> > > > >
> > > > > The most straightforward approach is breaking the chain after the
> > > source
> > > > > operator, even though it involves a tradeoff. This is because there
> > > may be
> > > > > instances of `StreamExecWatermarkAssigner`,
> > > `StreamExecMiniBatchAssigner`,
> > > > > or `StreamExecChangelogNormalize` occurring before the
> > `StreamExecCalc`
> > > > > node, and it would be complex and challenging to enumerate all
> > possible
> > > > > match patterns.
> > > > >
> > > > > A more complex workaround would be to provide an entry point for
> > users
> > > to
> > > > > configure the specific operator that should serve as the
> breakpoint.
> > > > > Meanwhile, this would further increase the complexity of this FLIP.
> > > > >
> > > > > However, if the parallelism of each operator can be configured (in
> > the
> > > > > future), then this problem would not exist (it might be beyond the
> > > scope of
> > > > > discussion for this FLIP).
> > > > >
> > > > > I personally lean towards keeping the FLIP concise and focused by
> > > choosing
> > > > > the most straightforward approach. I would also like to hear
> other's
> > > > > opinions.
> > > > >
> > > > > Best,
> > > > > Jane
> > > > >
> > > > > On Sat, Sep 16, 2023 at 10:21 AM Yun Tang <myas...@live.com>
> wrote:
> > > > >
> > > > > > Hi Zhanghao,
> > > > > >
> > > > > > Certainly, I think we shall leave this FLIP focus on setting the
> > > source
> > > > > > parallelism via DDL's properties. I just want to clarify that
> > setting
> > > > > > parallelism for individual operators is also profitable from my
> > > > > experience,
> > > > > > which is slighted in your FLIP.
> > > > > >
> > > > > > @ConradJam BTW, compared with SQL hint, I think using
> > > `scan.parallelism`
> > > > > > is better to align with current `sink.parallelism`. And once we
> > > introduce
> > > > > > such option, we can also use SQL hint of dynamic table options[1]
> > to
> > > > > > configure the source parallelism.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options
> > > > > >
> > > > > >
> > > > > > Best
> > > > > > Yun Tang
> > > > > > ________________________________
> > > > > > From: ConradJam <jam.gz...@gmail.com>
> > > > > > Sent: Friday, September 15, 2023 22:52
> > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > Subject: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for
> > > > > Table/SQL
> > > > > > Sources
> > > > > >
> > > > > > + 1 Thanks for the FLIP and the discussion. I would like to ask
> > > whether
> > > > > to
> > > > > > use SQL Hint syntax to set this parallelism?
> > > > > >
> > > > > > Martijn Visser <martijnvis...@apache.org> 于2023年9月15日周五 20:52写道:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > Thanks for the FLIP and the discussion. I find it exciting.
> > Thanks
> > > for
> > > > > > > pushing for this.
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Martijn
> > > > > > >
> > > > > > > On Fri, Sep 15, 2023 at 2:25 PM Chen Zhanghao <
> > > > > zhanghao.c...@outlook.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jane,
> > > > > > > >
> > > > > > > > Thanks for the valuable suggestions.
> > > > > > > >
> > > > > > > > For Q1, it's indeed an issue. Some possible ideas include
> > > > > introducing a
> > > > > > > > fake transformation after the source that takes the global
> > > default
> > > > > > > > parallelism, or simply make exec nodes to take the global
> > default
> > > > > > > > parallelism, but both ways prevent potential chaining
> > > opportunity and
> > > > > > I'm
> > > > > > > > not sure if that's good to go. We'll need to give deeper
> > > thoughts in
> > > > > it
> > > > > > > and
> > > > > > > > polish our proposal. We're also more than glad to hear your
> > > inputs on
> > > > > > it.
> > > > > > > >
> > > > > > > > For Q2, scan.parallelism will take high precedence, as the
> more
> > > > > > specific
> > > > > > > > config should take higher precedence.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Zhanghao Chen
> > > > > > > > ________________________________
> > > > > > > > 发件人: Jane Chan <qingyue....@gmail.com>
> > > > > > > > 发送时间: 2023年9月15日 11:56
> > > > > > > > 收件人: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > 抄送: dewe...@outlook.com <dewe...@outlook.com>
> > > > > > > > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for
> > > Table/SQL
> > > > > > > > Sources
> > > > > > > >
> > > > > > > > Hi, Zhanghao, Dewei,
> > > > > > > >
> > > > > > > > Thanks for initiating this discussion. This feature is
> valuable
> > > in
> > > > > > > > providing more flexibility for performance tuning for SQL
> > > pipelines.
> > > > > > > >
> > > > > > > > Here are my two cents,
> > > > > > > >
> > > > > > > > 1. In the FLIP, you mentioned concerns about the parallelism
> of
> > > the
> > > > > > calc
> > > > > > > > node and concluded to "leave the behavior unchanged for now."
> > > This
> > > > > > means
> > > > > > > > that the calc node will use the parallelism of the source
> > > operator,
> > > > > > > > regardless of whether the source parallelism is configured or
> > > not.
> > > > > If I
> > > > > > > > understand correctly, currently, except for the sink exec
> node
> > > (which
> > > > > > has
> > > > > > > > the ability to configure its own parallelism), the rest of
> the
> > > exec
> > > > > > nodes
> > > > > > > > accept its input parallelism. From the design, I didn't see
> the
> > > > > details
> > > > > > > > about coping with input and default parallelism for the rest
> of
> > > the
> > > > > > exec
> > > > > > > > nodes. Can you elaborate more about the details?
> > > > > > > >
> > > > > > > > 2. Does the configuration
> > > `table.exec.resource.default-parallelism`
> > > > > > take
> > > > > > > > precedence over `scan.parallelism`?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jane
> > > > > > > >
> > > > > > > > On Fri, Sep 15, 2023 at 10:43 AM Yun Tang <myas...@live.com>
> > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for creating this FLIP,
> > > > > > > > >
> > > > > > > > > Many users have demands to configure the source parallelism
> > > just as
> > > > > > > > > configuring the sink parallelism via DDL. Look forward for
> > this
> > > > > > > feature.
> > > > > > > > >
> > > > > > > > > BTW, I think setting parallelism for each operator should
> > also
> > > be
> > > > > > > > > valuable. And this shall work with compiled plan [1]
> instead
> > of
> > > > > SQL's
> > > > > > > > DDL.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-292%3A+Enhance+COMPILED+PLAN+to+support+operator-level+state+TTL+configuration
> > > > > > > > >
> > > > > > > > > Best
> > > > > > > > > Yun Tang
> > > > > > > > > ________________________________
> > > > > > > > > From: Benchao Li <libenc...@apache.org>
> > > > > > > > > Sent: Thursday, September 14, 2023 19:53
> > > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > > Cc: dewe...@outlook.com <dewe...@outlook.com>
> > > > > > > > > Subject: Re: [DISCUSS] FLIP-367: Support Setting
> Parallelism
> > > for
> > > > > > > > Table/SQL
> > > > > > > > > Sources
> > > > > > > > >
> > > > > > > > > Thanks Zhanghao, Dewei for preparing the FLIP,
> > > > > > > > >
> > > > > > > > > I think this is a long awaited feature, and I appreciate
> your
> > > > > effort,
> > > > > > > > > especially the "Other concerns" part you listed.
> > > > > > > > >
> > > > > > > > > Regarding the parallelism of transformations following the
> > > source
> > > > > > > > > transformation, it's indeed a problem that we initially
> want
> > to
> > > > > solve
> > > > > > > > > when we introduced this feature internally. I'd like to
> hear
> > > more
> > > > > > > > > opinions on this. Personally I'm ok to leave it out of this
> > > FLIP
> > > > > for
> > > > > > > > > the time being.
> > > > > > > > >
> > > > > > > > > Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月14日周四
> > > 14:46写道:
> > > > > > > > > >
> > > > > > > > > > Hi Devs,
> > > > > > > > > >
> > > > > > > > > > Dewei (cced) and I would like to start a discussion on
> > > FLIP-367:
> > > > > > > > Support
> > > > > > > > > Setting Parallelism for Table/SQL Sources [1].
> > > > > > > > > >
> > > > > > > > > > Currently, Flink Table/SQL jobs do not expose
> fine-grained
> > > > > control
> > > > > > of
> > > > > > > > > operator parallelism to users. FLIP-146 [2] brings us
> support
> > > for
> > > > > > > setting
> > > > > > > > > parallelism for sinks, but except for that, one can only
> set
> > a
> > > > > > default
> > > > > > > > > global parallelism and all other operators share the same
> > > > > > parallelism.
> > > > > > > > > However, in many cases, setting parallelism for sources
> > > > > individually
> > > > > > is
> > > > > > > > > preferable:
> > > > > > > > > >
> > > > > > > > > > - Many connectors have an upper bound parallelism to
> > > efficiently
> > > > > > > ingest
> > > > > > > > > data. For example, the parallelism of a Kafka source is
> bound
> > > by
> > > > > the
> > > > > > > > number
> > > > > > > > > of partitions, any extra tasks would be idle.
> > > > > > > > > > - Other operators may involve intensive computation and
> > need
> > > a
> > > > > > larger
> > > > > > > > > parallelism.
> > > > > > > > > >
> > > > > > > > > > We propose to improve the current situation by extending
> > the
> > > > > > current
> > > > > > > > > table source API to support setting parallelism for
> Table/SQL
> > > > > sources
> > > > > > > via
> > > > > > > > > connector options.
> > > > > > > > > >
> > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > >
> > > > > > > > > > [1] FLIP-367: Support Setting Parallelism for Table/SQL
> > > Sources -
> > > > > > > > Apache
> > > > > > > > > Flink - Apache Software Foundation<
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150
> > > > > > > > > >
> > > > > > > > > > [2] FLIP-146: Improve new TableSource and TableSink
> > > interfaces -
> > > > > > > Apache
> > > > > > > > > Flink - Apache Software Foundation<
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Zhanghao Chen
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Benchao Li
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best
> > > > > >
> > > > > > ConradJam
> > > > > >
> > > > >
> > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>

Reply via email to