Hi Aljoscha, I want to separate `Customized parallelism` and `Parallelism inference`.
### Customized parallelism First, I want to explain the current DataStream parallelism setting: `env.fromSource(...).setParallelism(...)`. This is how users explicitly specify parallelism, and it is the only way to set parallelism. The underlying Source (Eg.: SourceFunction) is completely independent of specific parallelism. The peripheral DataStream is responsible for setting parallelism. The table layer also needs to provide peer-to-peer capability. ### Parallelism inference Some sources have the ability to infer parallelism, like Kafka, parallelism can be inferred from the partition number. I think you are right, we should provide this to the underlying Source. This capability must be related to the underlying Source (Eg.: SourceFunction), so this capability must introduce a new interface for the underlying Source. The Table layer just tell underlying Source that user want to open parallelism inference: new MyRealSource(path, and, whatnot, parallelismInfer = true) What do you think? Best, Jingsong On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > I'll only respond regarding the parallelism for now because I need to > think some more about DataStream. > > What I'm saying is that exposing a parallelism only for Table Connectors > is not the right thing. If we want to allow sources to tell the > system/framework what would be a good parallelism it would be at the > underlying level. > > I'll explain with the SourceFunction. A Table API Source connector is > basically a factory that will give you a SourceFunction that corresponds > to whatever the user configured via properties and other means. If the > Table Connector somehow happens to know what would be a good parallelism > for the source it could "tell" the source when creating it, i.e. > > new MyRealSource(path, and, whatnot, parallelismHint) > > Then the source could either work with that information it got, by > shutting down (at runtime) some of its parallel instances. Or we could > extend the Source (SourceFunction) API to expose a "parallelism hint" to > the system. > > The basic thing is that Table Connectors are not the real connectors, > they just delegate to underlying real connectors. So those underlying > connectors are where we need to change things. Otherwise we would just > have special-case solutions for the Table API. > > Best, > Aljoscha > > On 25.09.20 14:30, admin wrote: > > Hi everyone, > > Thanks for the proposal. > > > > In our company,we meet the same situation as @liu shouwei. > > We developed some features base on flink.Such as parallelism of sql > source/sink connector, and kafka delay consumer which is adding a flatmap > and a keyby transformation after the source Datastream. > > What make us embarrassing is that when we migrate this features to Flink > 1.11,we found that the DataSteam is missing,So we modify the blink’s code > to support parallelism.But kafka delay comsumer is unsolved until now. > > > > From user’s perspective,it necessary to manipulate DataStream or have > the interoperability between Table API and DataStream. > > > > Best > > > > > > > >> 2020年9月25日 下午4:18,Rui Li <lirui.fu...@gmail.com> 写道: > >> > >> Hi Jingsong, > >> > >> Thanks for driving this effort. I have two minor comments. > >> > >> > >> 1. IMHO, parallelism is a concept that applies to all > ScanTableSource. > >> So instead of defining a new interface, is it more natural to > incorporate > >> parallel inference to existing interfaces, e.g. ScanTableSource > >> or ScanRuntimeProvider? > >> 2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. > From > >> a user's perspective, parallelism is either set by > `scan.parallelism`, or > >> automatically decided by Flink. If a user doesn't want the connector > to > >> infer parallelism, he/she can simply set `scan.parallelism`, no? > >> > >> > >> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <jingsongl...@gmail.com> > wrote: > >> > >>> Hi Aljoscha, > >>> > >>> Thank you for your feedback, > >>> > >>> ## Connector parallelism > >>> > >>> Requirements: > >>> Set parallelism by user specified or inferred by connector. > >>> > >>> How to configure parallelism in DataStream: > >>> In the DataStream world, the only way to configure parallelism is > >>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to > have > >>> access to DataStream when using a connector, not just the > `SourceFunction` > >>> / `Source` interface. > >>> Is parallelism related to connectors? I think yes, there are many > >>> connectors that can support obtaining parallelism related information > from > >>> them, and users do exactly that. This means parallelism inference (From > >>> connectors). > >>> The key is that `DataStream` is an open programming API, and users can > >>> freely program to set parallelism. > >>> > >>> How to configure parallelism in Table/SQL: > >>> But Table/SQL is not an open programming API, every feature needs a > >>> corresponding mechanism, because the user is no longer able to > program. Our > >>> current connector interface: SourceFunctionProvider, > SinkFunctionProvider, > >>> through these interfaces, there is no ability to generate connector > related > >>> parallelism. > >>> Back to our original intention: to avoid users directly manipulating > >>> `DataStream`. Since we want to avoid it, we need to provide > corresponding > >>> features. > >>> > >>> And parallelism is the runtime information of connectors, It fits the > name > >>> of `ScanRuntimeProvider`. > >>> > >>>> If we wanted to add a "get parallelism" it would be in those > underlying > >>> connectors but I'm also skeptical about adding such a method there > because > >>> it is a static assignment and would preclude clever optimizations > about the > >>> parallelism of a connector at runtime. > >>> > >>> I think that when a job is submitted, it is in compile time. It should > only > >>> provide static parallelism. > >>> > >>> ## DataStream in table connector > >>> > >>> As I said before, if we want to completely cancel DataStream in the > table > >>> connector, we need to provide corresponding functions in > >>> `xxRuntimeProvider`. > >>> Otherwise, we and users may not be able to migrate the old connectors. > >>> Including Hive/FileSystem connectors and the user cases I mentioned > above. > >>> CC: @liu shouwei > >>> > >>> We really need to consider these cases. > >>> If there is no alternative in a short period of time, for a long > >>> time, users need to continue to use the old table connector API, which > has > >>> been deprecated. > >>> > >>> Why not use StreamTableEnvironment fromDataStream/toDataStream? > >>> - These tables are just temporary tables. Can not be integrated/stored > into > >>> Catalog. > >>> - Creating table DDL can not work... > >>> - We need to lose the kinds of useful features of Table/SQL on the > >>> connector. For example, projection pushdown, filter pushdown, > partitions > >>> and etc... > >>> > >>> But I believe you are right in the long run. The source and sink APIs > >>> should be powerful enough to cover all reasonable cases. > >>> Maybe we can just introduce them in a minimal way. For example, we only > >>> introduce `DataStreamSinkProvider` in planner as an internal API. > >>> > >>> Your points are very meaningful, hope to get your reply. > >>> > >>> Best, > >>> Jingsong > >>> > >>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <wenlong88....@gmail.com> > >>> wrote: > >>> > >>>> Hi,Aljoscha, I would like to share a use case to second setting > >>> parallelism > >>>> of table sink(or limiting parallelism range of table sink): When > writing > >>>> data to databases, there is limitation for number of jdbc connections > and > >>>> query TPS. we would get errors of too many connections or high load > for > >>>> db and poor performance because of too many small requests if the > >>> optimizer > >>>> didn't know such information, and set a large parallelism for sink > when > >>>> matching the parallelism of its input. > >>>> > >>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljos...@apache.org> > >>>> wrote: > >>>> > >>>>> Thanks for the proposal! I think the use cases that we are trying to > >>>>> solve are indeed valid. However, I think we might have to take a step > >>>>> back to look at what we're trying to solve and how we can solve it. > >>>>> > >>>>> The FLIP seems to have two broader topics: 1) add "get parallelism" > to > >>>>> sinks/sources 2) let users write DataStream topologies for > >>>>> sinks/sources. I'll treat them separately below. > >>>>> > >>>>> I think we should not add "get parallelism" to the Table Sink API > >>>>> because I think it's the wrong level of abstraction. The Table API > >>>>> connectors are (or should be) more or less thin wrappers around > >>>>> "physical" connectors. By "physical" I mean the underlying (mostly > >>>>> DataStream API) connectors. For example, with the Kafka Connector the > >>>>> Table API connector just does the configuration parsing and > determines > >>> a > >>>>> good (de)serialization format and then creates the underlying > >>>>> FlinkKafkaConsumer/FlinkKafkaProducer. > >>>>> > >>>>> If we wanted to add a "get parallelism" it would be in those > underlying > >>>>> connectors but I'm also skeptical about adding such a method there > >>>>> because it is a static assignment and would preclude clever > >>>>> optimizations about the parallelism of a connector at runtime. But > >>> maybe > >>>>> that's thinking too much about future work so I'm open to discussion > >>>> there. > >>>>> > >>>>> Regarding the second point of letting Table connector developers use > >>>>> DataStream: I think we should not do it. One of the purposes of > FLIP-95 > >>>>> [1] was to decouple the Table API from the DataStream API for the > basic > >>>>> interfaces. Coupling the two too closely at that basic level will > make > >>>>> our live harder in the future when we want to evolve those APIs or > when > >>>>> we want the system to be better at choosing how to execute sources > and > >>>>> sinks. An example of this is actually the past of the Table API. > Before > >>>>> FLIP-95 we had connectors that dealt directly with DataSet and > >>>>> DataStream, meaning that if users wanted their Table Sink to work in > >>>>> both BATCH and STREAMING mode they had to provide two > implementations. > >>>>> The trend is towards unifying the sources/sinks to common interfaces > >>>>> that can be used for both BATCH and STREAMING execution but, again, I > >>>>> think exposing DataStream here would be a step back in the wrong > >>>> direction. > >>>>> > >>>>> I think the solution to the existing user requirement of using > >>>>> DataStream sources and sinks with the Table API should be better > >>>>> interoperability between the two APIs, which is being tackled right > now > >>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that > >>>>> we're trying to solve here, maybe we should think about FLIP-136 some > >>>> more. > >>>>> > >>>>> What do you think? > >>>>> > >>>>> Best, > >>>>> Aljoscha > >>>>> > >>>>> [1] > >>>>> > >>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > >>>>> [2] > >>>>> > >>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > >>>>> > >>>>> > >>>> > >>> > >>> > >>> -- > >>> Best, Jingsong Lee > >>> > >> > >> > >> -- > >> Best regards! > >> Rui Li > > > > -- Best, Jingsong Lee