Hi Krzysztof,

This is a very interesting idea.

I think SQL is not a suitable tool for this use case, because SQL is a
structured query language
 where the table schema is fixed and never changes during job running.

However, I think it can be a configuration tool project on top of Flink
SQL.
The configuration tool can dynamically generate all the queries according
to the config
 and submit them in one job.

For example, if the configuration says "synchronize from mysql address
'xxxx' to kafka broker 'yyyy'",
then the generated Flink SQL would like:

CREATE TABLE db (
  `database_name` STRING,
  `table_name` STRING,
  `data` BYTES  // encodes all the columns value, can be a better structure
for performance
) WITH (
  connector = ...   // a new connector scan all tables from the mysql
address
  url = 'jdbc:mysql://localhost:3306/flink-test'
);

// the configuration tool will generate multiple INSERT INTO according to
how many tables in the DB
INSERT INTO kafka_table1
SELECT parse_data(table_name, data)   // the parse_data UDF will infer
schema from database
FROM db WHERE table = 'table1'            // or schema registry and
deserialize the data into columns with different types.

INSERT INTO kafka_table2
SELECT parse_data(table_name, data)
FROM db WHERE table = 'table2'

...

The configuration tool can use `StatementSet` to package all the INSERT
INTO queries together and submit them in one job.
With the `StatementSet`, the job will share the common source task, so the
tables in MySQL are only read once.

Best,
Jark










On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <k.zarzy...@gmail.com>
wrote:

> Hi community, I would like to confront one idea with you.
>
> I was thinking that Flink SQL could be a Flink's answer for Kafka Connect
> (more powerful, with advantages like being decoupled from Kafka). Flink SQL
> would be the configuration language for Flink "connectors", sounds great!.
> But one thing does not allow me to implement this idea: There is no
> possibility to run SQL-based processing over multiple similar inputs and
> produce multiple similar outputs (counted in tens or hundreds).
> As a problem example that I need to solve, consider that I have a hundred
> of Kafka topics, with similar data in each. And I would like to sink them
> to a SQL database. With Kafka connect, I can use a single connector with
> JDBC sink, that properly configured will dump each topic to a separate
> table properly keeping the schema (based on what is in the schema
> registry).
> With Flink SQL I would need to run a query per topic/table, I believe.
> Similarly with sourcing data. There is this cool project
> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
> SQL database, but when used with SQL, it can only pull in one table per
> query.
> These cases can be solved using the datastream API. With it I can code
> pulling in/pushing out multiple table streams. But then "the configuration"
> is a much bigger effort, because it requires using java code. And that is a
> few hours vs few days case, an enormous difference.
>
> So in the end some questions:
> * Do you know how SQL could be extended to support handling such cases
> elegantly, with a single job in the end?
> * Or do you believe SQL should not be used for that case and we should
> come up with a different tool and configuration language? I.e. sth like
> Kafka Connect
> * Do you know of any other project that implements this idea?
>
> I definitely believe that this is a great use case for Flink to be an
> easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore
> there is a need for a solution for my case.
>
> Thanks for answering!
> Krzysztof
>
> [1] https://github.com/ververica/flink-cdc-connectors
>

Reply via email to