Yes,
kafka connect supports topics.regex option for Sink connectors. The
connector automatically discovers new topics which fit the regex pattern.
It's similar with source connectors, which discover tables in a SQL
database and save them to Kafka topics.


czw., 5 lis 2020 o 04:16 Jark Wu <imj...@gmail.com> napisał(a):

> Yes. The dynamism might be a problem.
> Does Kafka Connect support discovering new tables and synchronizing them
> dynamically?
>
> Best,
> Jark
>
> On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki <k.zarzy...@gmail.com>
> wrote:
>
>> Hi Jark, thanks for joining the discussion!
>> I understand your point of view that SQL environment is probably not the
>> best for what I was looking to achieve.
>> The idea of a configuration tool sounds almost perfect :) Almost ,
>> because:
>> Without the "StatementSet" that you mentioned at the end I would be
>> worried about resource consumption (job & task manager objects, buffers,
>> connections) of having one topology per table. That would be a significant
>> loss against architecture of Kafka Connect kind.
>> With StatementSet I understand this is not a case, but there is another
>> issue: We lose the dynamism. That is, the job won't be able to discover new
>> tables. We would need to always restart the whole (reconfigured)
>> StatementSet job. (Anyway, this approach sounds good enough to try it out
>> in my current assignment.)
>> The other issue I see is that I still need to define the DSL for the
>> configuration(sth like config of KConnect). SQL will not be it, it will
>> probably be barely a way to implement the tool.
>>
>> I would appreciate your comments, Jark.
>> Also if anyone would like to add other ideas, feel welcome!
>>
>> Best,
>> Krzysztof
>>
>> śr., 4 lis 2020 o 09:37 Jark Wu <imj...@gmail.com> napisał(a):
>>
>>> Hi Krzysztof,
>>>
>>> This is a very interesting idea.
>>>
>>> I think SQL is not a suitable tool for this use case, because SQL is a
>>> structured query language
>>>  where the table schema is fixed and never changes during job running.
>>>
>>> However, I think it can be a configuration tool project on top of Flink
>>> SQL.
>>> The configuration tool can dynamically generate all the queries
>>> according to the config
>>>  and submit them in one job.
>>>
>>> For example, if the configuration says "synchronize from mysql address
>>> 'xxxx' to kafka broker 'yyyy'",
>>> then the generated Flink SQL would like:
>>>
>>> CREATE TABLE db (
>>>   `database_name` STRING,
>>>   `table_name` STRING,
>>>   `data` BYTES  // encodes all the columns value, can be a better
>>> structure for performance
>>> ) WITH (
>>>   connector = ...   // a new connector scan all tables from the mysql
>>> address
>>>   url = 'jdbc:mysql://localhost:3306/flink-test'
>>> );
>>>
>>> // the configuration tool will generate multiple INSERT INTO according
>>> to how many tables in the DB
>>> INSERT INTO kafka_table1
>>> SELECT parse_data(table_name, data)   // the parse_data UDF will infer
>>> schema from database
>>> FROM db WHERE table = 'table1'            // or schema registry and
>>> deserialize the data into columns with different types.
>>>
>>> INSERT INTO kafka_table2
>>> SELECT parse_data(table_name, data)
>>> FROM db WHERE table = 'table2'
>>>
>>> ...
>>>
>>> The configuration tool can use `StatementSet` to package all the INSERT
>>> INTO queries together and submit them in one job.
>>> With the `StatementSet`, the job will share the common source task, so
>>> the tables in MySQL are only read once.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <k.zarzy...@gmail.com>
>>> wrote:
>>>
>>>> Hi community, I would like to confront one idea with you.
>>>>
>>>> I was thinking that Flink SQL could be a Flink's answer for Kafka
>>>> Connect (more powerful, with advantages like being decoupled from Kafka).
>>>> Flink SQL would be the configuration language for Flink "connectors",
>>>> sounds great!.
>>>> But one thing does not allow me to implement this idea: There is no
>>>> possibility to run SQL-based processing over multiple similar inputs and
>>>> produce multiple similar outputs (counted in tens or hundreds).
>>>> As a problem example that I need to solve, consider that I have a
>>>> hundred of Kafka topics, with similar data in each. And I would like to
>>>> sink them to a SQL database. With Kafka connect, I can use a single
>>>> connector with JDBC sink, that properly configured will dump each topic to
>>>> a separate table properly keeping the schema (based on what is in the
>>>> schema registry).
>>>> With Flink SQL I would need to run a query per topic/table, I believe.
>>>> Similarly with sourcing data. There is this cool project
>>>> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
>>>> SQL database, but when used with SQL, it can only pull in one table per
>>>> query.
>>>> These cases can be solved using the datastream API. With it I can code
>>>> pulling in/pushing out multiple table streams. But then "the configuration"
>>>> is a much bigger effort, because it requires using java code. And that is a
>>>> few hours vs few days case, an enormous difference.
>>>>
>>>> So in the end some questions:
>>>> * Do you know how SQL could be extended to support handling such cases
>>>> elegantly, with a single job in the end?
>>>> * Or do you believe SQL should not be used for that case and we should
>>>> come up with a different tool and configuration language? I.e. sth like
>>>> Kafka Connect
>>>> * Do you know of any other project that implements this idea?
>>>>
>>>> I definitely believe that this is a great use case for Flink to be an
>>>> easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore
>>>> there is a need for a solution for my case.
>>>>
>>>> Thanks for answering!
>>>> Krzysztof
>>>>
>>>> [1] https://github.com/ververica/flink-cdc-connectors
>>>>
>>>

Reply via email to