Hi Krzysztof, This is a very interesting idea.
I think SQL is not a suitable tool for this use case, because SQL is a structured query language where the table schema is fixed and never changes during job running. However, I think it can be a configuration tool project on top of Flink SQL. The configuration tool can dynamically generate all the queries according to the config and submit them in one job. For example, if the configuration says "synchronize from mysql address 'xxxx' to kafka broker 'yyyy'", then the generated Flink SQL would like: CREATE TABLE db ( `database_name` STRING, `table_name` STRING, `data` BYTES // encodes all the columns value, can be a better structure for performance ) WITH ( connector = ... // a new connector scan all tables from the mysql address url = 'jdbc:mysql://localhost:3306/flink-test' ); // the configuration tool will generate multiple INSERT INTO according to how many tables in the DB INSERT INTO kafka_table1 SELECT parse_data(table_name, data) // the parse_data UDF will infer schema from database FROM db WHERE table = 'table1' // or schema registry and deserialize the data into columns with different types. INSERT INTO kafka_table2 SELECT parse_data(table_name, data) FROM db WHERE table = 'table2' ... The configuration tool can use `StatementSet` to package all the INSERT INTO queries together and submit them in one job. With the `StatementSet`, the job will share the common source task, so the tables in MySQL are only read once. Best, Jark On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <k.zarzy...@gmail.com> wrote: > Hi community, I would like to confront one idea with you. > > I was thinking that Flink SQL could be a Flink's answer for Kafka Connect > (more powerful, with advantages like being decoupled from Kafka). Flink SQL > would be the configuration language for Flink "connectors", sounds great!. > But one thing does not allow me to implement this idea: There is no > possibility to run SQL-based processing over multiple similar inputs and > produce multiple similar outputs (counted in tens or hundreds). > As a problem example that I need to solve, consider that I have a hundred > of Kafka topics, with similar data in each. And I would like to sink them > to a SQL database. With Kafka connect, I can use a single connector with > JDBC sink, that properly configured will dump each topic to a separate > table properly keeping the schema (based on what is in the schema > registry). > With Flink SQL I would need to run a query per topic/table, I believe. > Similarly with sourcing data. There is this cool project > flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on > SQL database, but when used with SQL, it can only pull in one table per > query. > These cases can be solved using the datastream API. With it I can code > pulling in/pushing out multiple table streams. But then "the configuration" > is a much bigger effort, because it requires using java code. And that is a > few hours vs few days case, an enormous difference. > > So in the end some questions: > * Do you know how SQL could be extended to support handling such cases > elegantly, with a single job in the end? > * Or do you believe SQL should not be used for that case and we should > come up with a different tool and configuration language? I.e. sth like > Kafka Connect > * Do you know of any other project that implements this idea? > > I definitely believe that this is a great use case for Flink to be an > easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore > there is a need for a solution for my case. > > Thanks for answering! > Krzysztof > > [1] https://github.com/ververica/flink-cdc-connectors >