Hi Jingsong,

Thanks for driving this effort. I have two minor comments.


   1. IMHO, parallelism is a concept that applies to all ScanTableSource.
   So instead of defining a new interface, is it more natural to incorporate
   parallel inference to existing interfaces, e.g. ScanTableSource
   or ScanRuntimeProvider?
   2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From
   a user's perspective, parallelism is either set by `scan.parallelism`, or
   automatically decided by Flink. If a user doesn't want the connector to
   infer parallelism, he/she can simply set `scan.parallelism`, no?


On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi Aljoscha,
>
> Thank you for your feedback,
>
> ## Connector parallelism
>
> Requirements:
> Set parallelism by user specified or inferred by connector.
>
> How to configure parallelism in DataStream:
> In the DataStream world, the only way to configure parallelism is
> `SingleOutputStreamOperator.setParallelism`. Actually, users need to have
> access to DataStream when using a connector, not just the `SourceFunction`
> / `Source` interface.
> Is parallelism related to connectors? I think yes, there are many
> connectors that can support obtaining parallelism related information from
> them, and users do exactly that. This means parallelism inference (From
> connectors).
> The key is that `DataStream` is an open programming API, and users can
> freely program to set parallelism.
>
> How to configure parallelism in Table/SQL:
> But Table/SQL is not an open programming API, every feature needs a
> corresponding mechanism, because the user is no longer able to program. Our
> current connector interface: SourceFunctionProvider, SinkFunctionProvider,
> through these interfaces, there is no ability to generate connector related
> parallelism.
> Back to our original intention: to avoid users directly manipulating
> `DataStream`. Since we want to avoid it, we need to provide corresponding
> features.
>
> And parallelism is the runtime information of connectors, It fits the name
> of `ScanRuntimeProvider`.
>
> > If we wanted to add a "get parallelism" it would be in those underlying
> connectors but I'm also skeptical about adding such a method there because
> it is a static assignment and would preclude clever optimizations about the
> parallelism of a connector at runtime.
>
> I think that when a job is submitted, it is in compile time. It should only
> provide static parallelism.
>
> ## DataStream in table connector
>
> As I said before, if we want to completely cancel DataStream in the table
> connector, we need to provide corresponding functions in
> `xxRuntimeProvider`.
> Otherwise, we and users may not be able to migrate the old connectors.
> Including Hive/FileSystem connectors and the user cases I mentioned above.
> CC: @liu shouwei
>
> We really need to consider these cases.
> If there is no alternative in a short period of time, for a long
> time, users need to continue to use the old table connector API, which has
> been deprecated.
>
> Why not use StreamTableEnvironment fromDataStream/toDataStream?
> - These tables are just temporary tables. Can not be integrated/stored into
> Catalog.
> - Creating table DDL can not work...
> - We need to lose the kinds of useful features of Table/SQL on the
> connector. For example, projection pushdown, filter pushdown, partitions
> and etc...
>
> But I believe you are right in the long run. The source and sink APIs
> should be powerful enough to cover all reasonable cases.
> Maybe we can just introduce them in a minimal way. For example, we only
> introduce `DataStreamSinkProvider` in planner as an internal API.
>
> Your points are very meaningful, hope to get your reply.
>
> Best,
> Jingsong
>
> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <wenlong88....@gmail.com>
> wrote:
>
> > Hi,Aljoscha, I would like to share a use case to second setting
> parallelism
> > of table sink(or limiting parallelism range of table sink): When writing
> > data to databases, there is limitation for number of jdbc connections and
> > query TPS. we would get errors of too many connections or high load for
> > db and poor performance because of too many small requests if the
> optimizer
> > didn't know such information, and set a large parallelism for sink when
> > matching the parallelism of its input.
> >
> > On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Thanks for the proposal! I think the use cases that we are trying to
> > > solve are indeed valid. However, I think we might have to take a step
> > > back to look at what we're trying to solve and how we can solve it.
> > >
> > > The FLIP seems to have two broader topics: 1) add "get parallelism" to
> > > sinks/sources 2) let users write DataStream topologies for
> > > sinks/sources. I'll treat them separately below.
> > >
> > > I think we should not add "get parallelism" to the Table Sink API
> > > because I think it's the wrong level of abstraction. The Table API
> > > connectors are (or should be) more or less thin wrappers around
> > > "physical" connectors. By "physical" I mean the underlying (mostly
> > > DataStream API) connectors. For example, with the Kafka Connector the
> > > Table API connector just does the configuration parsing and determines
> a
> > > good (de)serialization format and then creates the underlying
> > > FlinkKafkaConsumer/FlinkKafkaProducer.
> > >
> > > If we wanted to add a "get parallelism" it would be in those underlying
> > > connectors but I'm also skeptical about adding such a method there
> > > because it is a static assignment and would preclude clever
> > > optimizations about the parallelism of a connector at runtime. But
> maybe
> > > that's thinking too much about future work so I'm open to discussion
> > there.
> > >
> > > Regarding the second point of letting Table connector developers use
> > > DataStream: I think we should not do it. One of the purposes of FLIP-95
> > > [1] was to decouple the Table API from the DataStream API for the basic
> > > interfaces. Coupling the two too closely at that basic level will make
> > > our live harder in the future when we want to evolve those APIs or when
> > > we want the system to be better at choosing how to execute sources and
> > > sinks. An example of this is actually the past of the Table API. Before
> > > FLIP-95 we had connectors that dealt directly with DataSet and
> > > DataStream, meaning that if users wanted their Table Sink to work in
> > > both BATCH and STREAMING mode they had to provide two implementations.
> > > The trend is towards unifying the sources/sinks to common interfaces
> > > that can be used for both BATCH and STREAMING execution but, again, I
> > > think exposing DataStream here would be a step back in the wrong
> > direction.
> > >
> > > I think the solution to the existing user requirement of using
> > > DataStream sources and sinks with the Table API should be better
> > > interoperability between the two APIs, which is being tackled right now
> > > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> > > we're trying to solve here, maybe we should think about FLIP-136 some
> > more.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> > >
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best regards!
Rui Li

Reply via email to