Hi everyone,
Thanks for the proposal.
In our company,we meet the same situation as @liu shouwei.
We developed some features base on flink.Such as parallelism of sql source/sink
connector, and kafka delay consumer which is adding a flatmap and a keyby
transformation after the source Datastream.
What make us embarrassing is that when we migrate this features to Flink
1.11,we found that the DataSteam is missing,So we modify the blink’s code to
support parallelism.But kafka delay comsumer is unsolved until now.
From user’s perspective,it necessary to manipulate DataStream or have the
interoperability between Table API and DataStream.
Best
> 2020年9月25日 下午4:18,Rui Li 写道:
>
> Hi Jingsong,
>
> Thanks for driving this effort. I have two minor comments.
>
>
> 1. IMHO, parallelism is a concept that applies to all ScanTableSource.
> So instead of defining a new interface, is it more natural to incorporate
> parallel inference to existing interfaces, e.g. ScanTableSource
> or ScanRuntimeProvider?
> 2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From
> a user's perspective, parallelism is either set by `scan.parallelism`, or
> automatically decided by Flink. If a user doesn't want the connector to
> infer parallelism, he/she can simply set `scan.parallelism`, no?
>
>
> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li wrote:
>
>> Hi Aljoscha,
>>
>> Thank you for your feedback,
>>
>> ## Connector parallelism
>>
>> Requirements:
>> Set parallelism by user specified or inferred by connector.
>>
>> How to configure parallelism in DataStream:
>> In the DataStream world, the only way to configure parallelism is
>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to have
>> access to DataStream when using a connector, not just the `SourceFunction`
>> / `Source` interface.
>> Is parallelism related to connectors? I think yes, there are many
>> connectors that can support obtaining parallelism related information from
>> them, and users do exactly that. This means parallelism inference (From
>> connectors).
>> The key is that `DataStream` is an open programming API, and users can
>> freely program to set parallelism.
>>
>> How to configure parallelism in Table/SQL:
>> But Table/SQL is not an open programming API, every feature needs a
>> corresponding mechanism, because the user is no longer able to program. Our
>> current connector interface: SourceFunctionProvider, SinkFunctionProvider,
>> through these interfaces, there is no ability to generate connector related
>> parallelism.
>> Back to our original intention: to avoid users directly manipulating
>> `DataStream`. Since we want to avoid it, we need to provide corresponding
>> features.
>>
>> And parallelism is the runtime information of connectors, It fits the name
>> of `ScanRuntimeProvider`.
>>
>>> If we wanted to add a "get parallelism" it would be in those underlying
>> connectors but I'm also skeptical about adding such a method there because
>> it is a static assignment and would preclude clever optimizations about the
>> parallelism of a connector at runtime.
>>
>> I think that when a job is submitted, it is in compile time. It should only
>> provide static parallelism.
>>
>> ## DataStream in table connector
>>
>> As I said before, if we want to completely cancel DataStream in the table
>> connector, we need to provide corresponding functions in
>> `xxRuntimeProvider`.
>> Otherwise, we and users may not be able to migrate the old connectors.
>> Including Hive/FileSystem connectors and the user cases I mentioned above.
>> CC: @liu shouwei
>>
>> We really need to consider these cases.
>> If there is no alternative in a short period of time, for a long
>> time, users need to continue to use the old table connector API, which has
>> been deprecated.
>>
>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
>> - These tables are just temporary tables. Can not be integrated/stored into
>> Catalog.
>> - Creating table DDL can not work...
>> - We need to lose the kinds of useful features of Table/SQL on the
>> connector. For example, projection pushdown, filter pushdown, partitions
>> and etc...
>>
>> But I believe you are right in the long run. The source and sink APIs
>> should be powerful enough to cover all reasonable cases.
>> Maybe we can just introduce them in a minimal way. For example, we only
>> introduce `DataStreamSinkProvider` in planner as an internal API.
>>
>> Your points are very meaningful, hope to get your reply.
>>
>> Best,
>> Jingsong
>>
>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl
>> wrote:
>>
>>> Hi,Aljoscha, I would like to share a use case to second setting
>> parallelism
>>> of table sink(or limiting parallelism range of table sink): When writing
>>> data to databases, there is limitation for number of jdbc connections and
>>> query TPS. we would get errors of too many connections or high load for
>>> db and poor performance because of too many smal