Hi Jingsong, Thanks for driving this effort. I have two minor comments.
1. IMHO, parallelism is a concept that applies to all ScanTableSource. So instead of defining a new interface, is it more natural to incorporate parallel inference to existing interfaces, e.g. ScanTableSource or ScanRuntimeProvider? 2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From a user's perspective, parallelism is either set by `scan.parallelism`, or automatically decided by Flink. If a user doesn't want the connector to infer parallelism, he/she can simply set `scan.parallelism`, no? On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi Aljoscha, > > Thank you for your feedback, > > ## Connector parallelism > > Requirements: > Set parallelism by user specified or inferred by connector. > > How to configure parallelism in DataStream: > In the DataStream world, the only way to configure parallelism is > `SingleOutputStreamOperator.setParallelism`. Actually, users need to have > access to DataStream when using a connector, not just the `SourceFunction` > / `Source` interface. > Is parallelism related to connectors? I think yes, there are many > connectors that can support obtaining parallelism related information from > them, and users do exactly that. This means parallelism inference (From > connectors). > The key is that `DataStream` is an open programming API, and users can > freely program to set parallelism. > > How to configure parallelism in Table/SQL: > But Table/SQL is not an open programming API, every feature needs a > corresponding mechanism, because the user is no longer able to program. Our > current connector interface: SourceFunctionProvider, SinkFunctionProvider, > through these interfaces, there is no ability to generate connector related > parallelism. > Back to our original intention: to avoid users directly manipulating > `DataStream`. Since we want to avoid it, we need to provide corresponding > features. > > And parallelism is the runtime information of connectors, It fits the name > of `ScanRuntimeProvider`. > > > If we wanted to add a "get parallelism" it would be in those underlying > connectors but I'm also skeptical about adding such a method there because > it is a static assignment and would preclude clever optimizations about the > parallelism of a connector at runtime. > > I think that when a job is submitted, it is in compile time. It should only > provide static parallelism. > > ## DataStream in table connector > > As I said before, if we want to completely cancel DataStream in the table > connector, we need to provide corresponding functions in > `xxRuntimeProvider`. > Otherwise, we and users may not be able to migrate the old connectors. > Including Hive/FileSystem connectors and the user cases I mentioned above. > CC: @liu shouwei > > We really need to consider these cases. > If there is no alternative in a short period of time, for a long > time, users need to continue to use the old table connector API, which has > been deprecated. > > Why not use StreamTableEnvironment fromDataStream/toDataStream? > - These tables are just temporary tables. Can not be integrated/stored into > Catalog. > - Creating table DDL can not work... > - We need to lose the kinds of useful features of Table/SQL on the > connector. For example, projection pushdown, filter pushdown, partitions > and etc... > > But I believe you are right in the long run. The source and sink APIs > should be powerful enough to cover all reasonable cases. > Maybe we can just introduce them in a minimal way. For example, we only > introduce `DataStreamSinkProvider` in planner as an internal API. > > Your points are very meaningful, hope to get your reply. > > Best, > Jingsong > > On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <wenlong88....@gmail.com> > wrote: > > > Hiļ¼Aljoscha, I would like to share a use case to second setting > parallelism > > of table sink(or limiting parallelism range of table sink): When writing > > data to databases, there is limitation for number of jdbc connections and > > query TPS. we would get errors of too many connections or high load for > > db and poor performance because of too many small requests if the > optimizer > > didn't know such information, and set a large parallelism for sink when > > matching the parallelism of its input. > > > > On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > Thanks for the proposal! I think the use cases that we are trying to > > > solve are indeed valid. However, I think we might have to take a step > > > back to look at what we're trying to solve and how we can solve it. > > > > > > The FLIP seems to have two broader topics: 1) add "get parallelism" to > > > sinks/sources 2) let users write DataStream topologies for > > > sinks/sources. I'll treat them separately below. > > > > > > I think we should not add "get parallelism" to the Table Sink API > > > because I think it's the wrong level of abstraction. The Table API > > > connectors are (or should be) more or less thin wrappers around > > > "physical" connectors. By "physical" I mean the underlying (mostly > > > DataStream API) connectors. For example, with the Kafka Connector the > > > Table API connector just does the configuration parsing and determines > a > > > good (de)serialization format and then creates the underlying > > > FlinkKafkaConsumer/FlinkKafkaProducer. > > > > > > If we wanted to add a "get parallelism" it would be in those underlying > > > connectors but I'm also skeptical about adding such a method there > > > because it is a static assignment and would preclude clever > > > optimizations about the parallelism of a connector at runtime. But > maybe > > > that's thinking too much about future work so I'm open to discussion > > there. > > > > > > Regarding the second point of letting Table connector developers use > > > DataStream: I think we should not do it. One of the purposes of FLIP-95 > > > [1] was to decouple the Table API from the DataStream API for the basic > > > interfaces. Coupling the two too closely at that basic level will make > > > our live harder in the future when we want to evolve those APIs or when > > > we want the system to be better at choosing how to execute sources and > > > sinks. An example of this is actually the past of the Table API. Before > > > FLIP-95 we had connectors that dealt directly with DataSet and > > > DataStream, meaning that if users wanted their Table Sink to work in > > > both BATCH and STREAMING mode they had to provide two implementations. > > > The trend is towards unifying the sources/sinks to common interfaces > > > that can be used for both BATCH and STREAMING execution but, again, I > > > think exposing DataStream here would be a step back in the wrong > > direction. > > > > > > I think the solution to the existing user requirement of using > > > DataStream sources and sinks with the Table API should be better > > > interoperability between the two APIs, which is being tackled right now > > > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that > > > we're trying to solve here, maybe we should think about FLIP-136 some > > more. > > > > > > What do you think? > > > > > > Best, > > > Aljoscha > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > > [2] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > > > > > > > > > > -- > Best, Jingsong Lee > -- Best regards! Rui Li