Yes, kafka connect supports topics.regex option for Sink connectors. The connector automatically discovers new topics which fit the regex pattern. It's similar with source connectors, which discover tables in a SQL database and save them to Kafka topics.
czw., 5 lis 2020 o 04:16 Jark Wu <imj...@gmail.com> napisał(a): > Yes. The dynamism might be a problem. > Does Kafka Connect support discovering new tables and synchronizing them > dynamically? > > Best, > Jark > > On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki <k.zarzy...@gmail.com> > wrote: > >> Hi Jark, thanks for joining the discussion! >> I understand your point of view that SQL environment is probably not the >> best for what I was looking to achieve. >> The idea of a configuration tool sounds almost perfect :) Almost , >> because: >> Without the "StatementSet" that you mentioned at the end I would be >> worried about resource consumption (job & task manager objects, buffers, >> connections) of having one topology per table. That would be a significant >> loss against architecture of Kafka Connect kind. >> With StatementSet I understand this is not a case, but there is another >> issue: We lose the dynamism. That is, the job won't be able to discover new >> tables. We would need to always restart the whole (reconfigured) >> StatementSet job. (Anyway, this approach sounds good enough to try it out >> in my current assignment.) >> The other issue I see is that I still need to define the DSL for the >> configuration(sth like config of KConnect). SQL will not be it, it will >> probably be barely a way to implement the tool. >> >> I would appreciate your comments, Jark. >> Also if anyone would like to add other ideas, feel welcome! >> >> Best, >> Krzysztof >> >> śr., 4 lis 2020 o 09:37 Jark Wu <imj...@gmail.com> napisał(a): >> >>> Hi Krzysztof, >>> >>> This is a very interesting idea. >>> >>> I think SQL is not a suitable tool for this use case, because SQL is a >>> structured query language >>> where the table schema is fixed and never changes during job running. >>> >>> However, I think it can be a configuration tool project on top of Flink >>> SQL. >>> The configuration tool can dynamically generate all the queries >>> according to the config >>> and submit them in one job. >>> >>> For example, if the configuration says "synchronize from mysql address >>> 'xxxx' to kafka broker 'yyyy'", >>> then the generated Flink SQL would like: >>> >>> CREATE TABLE db ( >>> `database_name` STRING, >>> `table_name` STRING, >>> `data` BYTES // encodes all the columns value, can be a better >>> structure for performance >>> ) WITH ( >>> connector = ... // a new connector scan all tables from the mysql >>> address >>> url = 'jdbc:mysql://localhost:3306/flink-test' >>> ); >>> >>> // the configuration tool will generate multiple INSERT INTO according >>> to how many tables in the DB >>> INSERT INTO kafka_table1 >>> SELECT parse_data(table_name, data) // the parse_data UDF will infer >>> schema from database >>> FROM db WHERE table = 'table1' // or schema registry and >>> deserialize the data into columns with different types. >>> >>> INSERT INTO kafka_table2 >>> SELECT parse_data(table_name, data) >>> FROM db WHERE table = 'table2' >>> >>> ... >>> >>> The configuration tool can use `StatementSet` to package all the INSERT >>> INTO queries together and submit them in one job. >>> With the `StatementSet`, the job will share the common source task, so >>> the tables in MySQL are only read once. >>> >>> Best, >>> Jark >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <k.zarzy...@gmail.com> >>> wrote: >>> >>>> Hi community, I would like to confront one idea with you. >>>> >>>> I was thinking that Flink SQL could be a Flink's answer for Kafka >>>> Connect (more powerful, with advantages like being decoupled from Kafka). >>>> Flink SQL would be the configuration language for Flink "connectors", >>>> sounds great!. >>>> But one thing does not allow me to implement this idea: There is no >>>> possibility to run SQL-based processing over multiple similar inputs and >>>> produce multiple similar outputs (counted in tens or hundreds). >>>> As a problem example that I need to solve, consider that I have a >>>> hundred of Kafka topics, with similar data in each. And I would like to >>>> sink them to a SQL database. With Kafka connect, I can use a single >>>> connector with JDBC sink, that properly configured will dump each topic to >>>> a separate table properly keeping the schema (based on what is in the >>>> schema registry). >>>> With Flink SQL I would need to run a query per topic/table, I believe. >>>> Similarly with sourcing data. There is this cool project >>>> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on >>>> SQL database, but when used with SQL, it can only pull in one table per >>>> query. >>>> These cases can be solved using the datastream API. With it I can code >>>> pulling in/pushing out multiple table streams. But then "the configuration" >>>> is a much bigger effort, because it requires using java code. And that is a >>>> few hours vs few days case, an enormous difference. >>>> >>>> So in the end some questions: >>>> * Do you know how SQL could be extended to support handling such cases >>>> elegantly, with a single job in the end? >>>> * Or do you believe SQL should not be used for that case and we should >>>> come up with a different tool and configuration language? I.e. sth like >>>> Kafka Connect >>>> * Do you know of any other project that implements this idea? >>>> >>>> I definitely believe that this is a great use case for Flink to be an >>>> easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore >>>> there is a need for a solution for my case. >>>> >>>> Thanks for answering! >>>> Krzysztof >>>> >>>> [1] https://github.com/ververica/flink-cdc-connectors >>>> >>>