Hi Aljoscha,

I want to separate `Customized parallelism` and `Parallelism inference`.

### Customized parallelism

First, I want to explain the current DataStream parallelism setting:
`env.fromSource(...).setParallelism(...)`.
This is how users explicitly specify parallelism, and it is the only way to
set parallelism.

The underlying Source (Eg.: SourceFunction) is completely independent of
specific parallelism. The peripheral DataStream is responsible for setting
parallelism.
The table layer also needs to provide peer-to-peer capability.

### Parallelism inference

Some sources have the ability to infer parallelism, like Kafka, parallelism
can be inferred from the partition number.

I think you are right, we should provide this to the underlying Source.
This capability must be related to the underlying Source (Eg.:
SourceFunction), so this capability must introduce a new interface for the
underlying Source.

The Table layer just tell underlying Source that user want to open
parallelism inference:

new MyRealSource(path, and, whatnot, parallelismInfer = true)

What do you think?

Best,
Jingsong

On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> I'll only respond regarding the parallelism for now because I need to
> think some more about DataStream.
>
> What I'm saying is that exposing a parallelism only for Table Connectors
> is not the right thing. If we want to allow sources to tell the
> system/framework what would be a good parallelism it would be at the
> underlying level.
>
> I'll explain with the SourceFunction. A Table API Source connector is
> basically a factory that will give you a SourceFunction that corresponds
> to whatever the user configured via properties and other means. If the
> Table Connector somehow happens to know what would be a good parallelism
> for the source it could "tell" the source when creating it, i.e.
>
>    new MyRealSource(path, and, whatnot, parallelismHint)
>
> Then the source could either work with that information it got, by
> shutting down (at runtime) some of its parallel instances. Or we could
> extend the Source (SourceFunction) API to expose a "parallelism hint" to
> the system.
>
> The basic thing is that Table Connectors are not the real connectors,
> they just delegate to underlying real connectors. So those underlying
> connectors are where we need to change things. Otherwise we would just
> have special-case solutions for the Table API.
>
> Best,
> Aljoscha
>
> On 25.09.20 14:30, admin wrote:
> > Hi everyone,
> > Thanks for the proposal.
> >
> > In our company,we meet the same situation as @liu shouwei.
> > We developed some features base on flink.Such as parallelism of sql
> source/sink  connector, and kafka delay consumer which is adding a flatmap
> and a keyby transformation after the source Datastream.
> > What make us embarrassing is that when we migrate this features to Flink
> 1.11,we found that the DataSteam is missing,So we modify the blink’s code
> to support parallelism.But kafka delay comsumer is unsolved until now.
> >
> >  From user’s perspective,it necessary to manipulate DataStream or have
> the interoperability between Table API and DataStream.
> >
> > Best
> >
> >
> >
> >> 2020年9月25日 下午4:18,Rui Li <lirui.fu...@gmail.com> 写道:
> >>
> >> Hi Jingsong,
> >>
> >> Thanks for driving this effort. I have two minor comments.
> >>
> >>
> >>    1. IMHO, parallelism is a concept that applies to all
> ScanTableSource.
> >>    So instead of defining a new interface, is it more natural to
> incorporate
> >>    parallel inference to existing interfaces, e.g. ScanTableSource
> >>    or ScanRuntimeProvider?
> >>    2. `scan.infer-parallelism.enabled` doesn't seem very useful to me.
> From
> >>    a user's perspective, parallelism is either set by
> `scan.parallelism`, or
> >>    automatically decided by Flink. If a user doesn't want the connector
> to
> >>    infer parallelism, he/she can simply set `scan.parallelism`, no?
> >>
> >>
> >> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
> >>
> >>> Hi Aljoscha,
> >>>
> >>> Thank you for your feedback,
> >>>
> >>> ## Connector parallelism
> >>>
> >>> Requirements:
> >>> Set parallelism by user specified or inferred by connector.
> >>>
> >>> How to configure parallelism in DataStream:
> >>> In the DataStream world, the only way to configure parallelism is
> >>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
> have
> >>> access to DataStream when using a connector, not just the
> `SourceFunction`
> >>> / `Source` interface.
> >>> Is parallelism related to connectors? I think yes, there are many
> >>> connectors that can support obtaining parallelism related information
> from
> >>> them, and users do exactly that. This means parallelism inference (From
> >>> connectors).
> >>> The key is that `DataStream` is an open programming API, and users can
> >>> freely program to set parallelism.
> >>>
> >>> How to configure parallelism in Table/SQL:
> >>> But Table/SQL is not an open programming API, every feature needs a
> >>> corresponding mechanism, because the user is no longer able to
> program. Our
> >>> current connector interface: SourceFunctionProvider,
> SinkFunctionProvider,
> >>> through these interfaces, there is no ability to generate connector
> related
> >>> parallelism.
> >>> Back to our original intention: to avoid users directly manipulating
> >>> `DataStream`. Since we want to avoid it, we need to provide
> corresponding
> >>> features.
> >>>
> >>> And parallelism is the runtime information of connectors, It fits the
> name
> >>> of `ScanRuntimeProvider`.
> >>>
> >>>> If we wanted to add a "get parallelism" it would be in those
> underlying
> >>> connectors but I'm also skeptical about adding such a method there
> because
> >>> it is a static assignment and would preclude clever optimizations
> about the
> >>> parallelism of a connector at runtime.
> >>>
> >>> I think that when a job is submitted, it is in compile time. It should
> only
> >>> provide static parallelism.
> >>>
> >>> ## DataStream in table connector
> >>>
> >>> As I said before, if we want to completely cancel DataStream in the
> table
> >>> connector, we need to provide corresponding functions in
> >>> `xxRuntimeProvider`.
> >>> Otherwise, we and users may not be able to migrate the old connectors.
> >>> Including Hive/FileSystem connectors and the user cases I mentioned
> above.
> >>> CC: @liu shouwei
> >>>
> >>> We really need to consider these cases.
> >>> If there is no alternative in a short period of time, for a long
> >>> time, users need to continue to use the old table connector API, which
> has
> >>> been deprecated.
> >>>
> >>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
> >>> - These tables are just temporary tables. Can not be integrated/stored
> into
> >>> Catalog.
> >>> - Creating table DDL can not work...
> >>> - We need to lose the kinds of useful features of Table/SQL on the
> >>> connector. For example, projection pushdown, filter pushdown,
> partitions
> >>> and etc...
> >>>
> >>> But I believe you are right in the long run. The source and sink APIs
> >>> should be powerful enough to cover all reasonable cases.
> >>> Maybe we can just introduce them in a minimal way. For example, we only
> >>> introduce `DataStreamSinkProvider` in planner as an internal API.
> >>>
> >>> Your points are very meaningful, hope to get your reply.
> >>>
> >>> Best,
> >>> Jingsong
> >>>
> >>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <wenlong88....@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,Aljoscha, I would like to share a use case to second setting
> >>> parallelism
> >>>> of table sink(or limiting parallelism range of table sink): When
> writing
> >>>> data to databases, there is limitation for number of jdbc connections
> and
> >>>> query TPS. we would get errors of too many connections or high load
> for
> >>>> db and poor performance because of too many small requests if the
> >>> optimizer
> >>>> didn't know such information, and set a large parallelism for sink
> when
> >>>> matching the parallelism of its input.
> >>>>
> >>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljos...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Thanks for the proposal! I think the use cases that we are trying to
> >>>>> solve are indeed valid. However, I think we might have to take a step
> >>>>> back to look at what we're trying to solve and how we can solve it.
> >>>>>
> >>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
> to
> >>>>> sinks/sources 2) let users write DataStream topologies for
> >>>>> sinks/sources. I'll treat them separately below.
> >>>>>
> >>>>> I think we should not add "get parallelism" to the Table Sink API
> >>>>> because I think it's the wrong level of abstraction. The Table API
> >>>>> connectors are (or should be) more or less thin wrappers around
> >>>>> "physical" connectors. By "physical" I mean the underlying (mostly
> >>>>> DataStream API) connectors. For example, with the Kafka Connector the
> >>>>> Table API connector just does the configuration parsing and
> determines
> >>> a
> >>>>> good (de)serialization format and then creates the underlying
> >>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
> >>>>>
> >>>>> If we wanted to add a "get parallelism" it would be in those
> underlying
> >>>>> connectors but I'm also skeptical about adding such a method there
> >>>>> because it is a static assignment and would preclude clever
> >>>>> optimizations about the parallelism of a connector at runtime. But
> >>> maybe
> >>>>> that's thinking too much about future work so I'm open to discussion
> >>>> there.
> >>>>>
> >>>>> Regarding the second point of letting Table connector developers use
> >>>>> DataStream: I think we should not do it. One of the purposes of
> FLIP-95
> >>>>> [1] was to decouple the Table API from the DataStream API for the
> basic
> >>>>> interfaces. Coupling the two too closely at that basic level will
> make
> >>>>> our live harder in the future when we want to evolve those APIs or
> when
> >>>>> we want the system to be better at choosing how to execute sources
> and
> >>>>> sinks. An example of this is actually the past of the Table API.
> Before
> >>>>> FLIP-95 we had connectors that dealt directly with DataSet and
> >>>>> DataStream, meaning that if users wanted their Table Sink to work in
> >>>>> both BATCH and STREAMING mode they had to provide two
> implementations.
> >>>>> The trend is towards unifying the sources/sinks to common interfaces
> >>>>> that can be used for both BATCH and STREAMING execution but, again, I
> >>>>> think exposing DataStream here would be a step back in the wrong
> >>>> direction.
> >>>>>
> >>>>> I think the solution to the existing user requirement of using
> >>>>> DataStream sources and sinks with the Table API should be better
> >>>>> interoperability between the two APIs, which is being tackled right
> now
> >>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> >>>>> we're trying to solve here, maybe we should think about FLIP-136 some
> >>>> more.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Best,
> >>>>> Aljoscha
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >>>>> [2]
> >>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>>
> >>
> >>
> >> --
> >> Best regards!
> >> Rui Li
> >
>
>

-- 
Best, Jingsong Lee

Reply via email to