Yes. The dynamism might be a problem. Does Kafka Connect support discovering new tables and synchronizing them dynamically?
Best, Jark On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki <k.zarzy...@gmail.com> wrote: > Hi Jark, thanks for joining the discussion! > I understand your point of view that SQL environment is probably not the > best for what I was looking to achieve. > The idea of a configuration tool sounds almost perfect :) Almost , because: > Without the "StatementSet" that you mentioned at the end I would be > worried about resource consumption (job & task manager objects, buffers, > connections) of having one topology per table. That would be a significant > loss against architecture of Kafka Connect kind. > With StatementSet I understand this is not a case, but there is another > issue: We lose the dynamism. That is, the job won't be able to discover new > tables. We would need to always restart the whole (reconfigured) > StatementSet job. (Anyway, this approach sounds good enough to try it out > in my current assignment.) > The other issue I see is that I still need to define the DSL for the > configuration(sth like config of KConnect). SQL will not be it, it will > probably be barely a way to implement the tool. > > I would appreciate your comments, Jark. > Also if anyone would like to add other ideas, feel welcome! > > Best, > Krzysztof > > śr., 4 lis 2020 o 09:37 Jark Wu <imj...@gmail.com> napisał(a): > >> Hi Krzysztof, >> >> This is a very interesting idea. >> >> I think SQL is not a suitable tool for this use case, because SQL is a >> structured query language >> where the table schema is fixed and never changes during job running. >> >> However, I think it can be a configuration tool project on top of Flink >> SQL. >> The configuration tool can dynamically generate all the queries according >> to the config >> and submit them in one job. >> >> For example, if the configuration says "synchronize from mysql address >> 'xxxx' to kafka broker 'yyyy'", >> then the generated Flink SQL would like: >> >> CREATE TABLE db ( >> `database_name` STRING, >> `table_name` STRING, >> `data` BYTES // encodes all the columns value, can be a better >> structure for performance >> ) WITH ( >> connector = ... // a new connector scan all tables from the mysql >> address >> url = 'jdbc:mysql://localhost:3306/flink-test' >> ); >> >> // the configuration tool will generate multiple INSERT INTO according to >> how many tables in the DB >> INSERT INTO kafka_table1 >> SELECT parse_data(table_name, data) // the parse_data UDF will infer >> schema from database >> FROM db WHERE table = 'table1' // or schema registry and >> deserialize the data into columns with different types. >> >> INSERT INTO kafka_table2 >> SELECT parse_data(table_name, data) >> FROM db WHERE table = 'table2' >> >> ... >> >> The configuration tool can use `StatementSet` to package all the INSERT >> INTO queries together and submit them in one job. >> With the `StatementSet`, the job will share the common source task, so >> the tables in MySQL are only read once. >> >> Best, >> Jark >> >> >> >> >> >> >> >> >> >> >> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <k.zarzy...@gmail.com> >> wrote: >> >>> Hi community, I would like to confront one idea with you. >>> >>> I was thinking that Flink SQL could be a Flink's answer for Kafka >>> Connect (more powerful, with advantages like being decoupled from Kafka). >>> Flink SQL would be the configuration language for Flink "connectors", >>> sounds great!. >>> But one thing does not allow me to implement this idea: There is no >>> possibility to run SQL-based processing over multiple similar inputs and >>> produce multiple similar outputs (counted in tens or hundreds). >>> As a problem example that I need to solve, consider that I have a >>> hundred of Kafka topics, with similar data in each. And I would like to >>> sink them to a SQL database. With Kafka connect, I can use a single >>> connector with JDBC sink, that properly configured will dump each topic to >>> a separate table properly keeping the schema (based on what is in the >>> schema registry). >>> With Flink SQL I would need to run a query per topic/table, I believe. >>> Similarly with sourcing data. There is this cool project >>> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on >>> SQL database, but when used with SQL, it can only pull in one table per >>> query. >>> These cases can be solved using the datastream API. With it I can code >>> pulling in/pushing out multiple table streams. But then "the configuration" >>> is a much bigger effort, because it requires using java code. And that is a >>> few hours vs few days case, an enormous difference. >>> >>> So in the end some questions: >>> * Do you know how SQL could be extended to support handling such cases >>> elegantly, with a single job in the end? >>> * Or do you believe SQL should not be used for that case and we should >>> come up with a different tool and configuration language? I.e. sth like >>> Kafka Connect >>> * Do you know of any other project that implements this idea? >>> >>> I definitely believe that this is a great use case for Flink to be an >>> easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore >>> there is a need for a solution for my case. >>> >>> Thanks for answering! >>> Krzysztof >>> >>> [1] https://github.com/ververica/flink-cdc-connectors >>> >>