Yes. The dynamism might be a problem.
Does Kafka Connect support discovering new tables and synchronizing them
dynamically?

Best,
Jark

On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki <k.zarzy...@gmail.com>
wrote:

> Hi Jark, thanks for joining the discussion!
> I understand your point of view that SQL environment is probably not the
> best for what I was looking to achieve.
> The idea of a configuration tool sounds almost perfect :) Almost , because:
> Without the "StatementSet" that you mentioned at the end I would be
> worried about resource consumption (job & task manager objects, buffers,
> connections) of having one topology per table. That would be a significant
> loss against architecture of Kafka Connect kind.
> With StatementSet I understand this is not a case, but there is another
> issue: We lose the dynamism. That is, the job won't be able to discover new
> tables. We would need to always restart the whole (reconfigured)
> StatementSet job. (Anyway, this approach sounds good enough to try it out
> in my current assignment.)
> The other issue I see is that I still need to define the DSL for the
> configuration(sth like config of KConnect). SQL will not be it, it will
> probably be barely a way to implement the tool.
>
> I would appreciate your comments, Jark.
> Also if anyone would like to add other ideas, feel welcome!
>
> Best,
> Krzysztof
>
> śr., 4 lis 2020 o 09:37 Jark Wu <imj...@gmail.com> napisał(a):
>
>> Hi Krzysztof,
>>
>> This is a very interesting idea.
>>
>> I think SQL is not a suitable tool for this use case, because SQL is a
>> structured query language
>>  where the table schema is fixed and never changes during job running.
>>
>> However, I think it can be a configuration tool project on top of Flink
>> SQL.
>> The configuration tool can dynamically generate all the queries according
>> to the config
>>  and submit them in one job.
>>
>> For example, if the configuration says "synchronize from mysql address
>> 'xxxx' to kafka broker 'yyyy'",
>> then the generated Flink SQL would like:
>>
>> CREATE TABLE db (
>>   `database_name` STRING,
>>   `table_name` STRING,
>>   `data` BYTES  // encodes all the columns value, can be a better
>> structure for performance
>> ) WITH (
>>   connector = ...   // a new connector scan all tables from the mysql
>> address
>>   url = 'jdbc:mysql://localhost:3306/flink-test'
>> );
>>
>> // the configuration tool will generate multiple INSERT INTO according to
>> how many tables in the DB
>> INSERT INTO kafka_table1
>> SELECT parse_data(table_name, data)   // the parse_data UDF will infer
>> schema from database
>> FROM db WHERE table = 'table1'            // or schema registry and
>> deserialize the data into columns with different types.
>>
>> INSERT INTO kafka_table2
>> SELECT parse_data(table_name, data)
>> FROM db WHERE table = 'table2'
>>
>> ...
>>
>> The configuration tool can use `StatementSet` to package all the INSERT
>> INTO queries together and submit them in one job.
>> With the `StatementSet`, the job will share the common source task, so
>> the tables in MySQL are only read once.
>>
>> Best,
>> Jark
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <k.zarzy...@gmail.com>
>> wrote:
>>
>>> Hi community, I would like to confront one idea with you.
>>>
>>> I was thinking that Flink SQL could be a Flink's answer for Kafka
>>> Connect (more powerful, with advantages like being decoupled from Kafka).
>>> Flink SQL would be the configuration language for Flink "connectors",
>>> sounds great!.
>>> But one thing does not allow me to implement this idea: There is no
>>> possibility to run SQL-based processing over multiple similar inputs and
>>> produce multiple similar outputs (counted in tens or hundreds).
>>> As a problem example that I need to solve, consider that I have a
>>> hundred of Kafka topics, with similar data in each. And I would like to
>>> sink them to a SQL database. With Kafka connect, I can use a single
>>> connector with JDBC sink, that properly configured will dump each topic to
>>> a separate table properly keeping the schema (based on what is in the
>>> schema registry).
>>> With Flink SQL I would need to run a query per topic/table, I believe.
>>> Similarly with sourcing data. There is this cool project
>>> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
>>> SQL database, but when used with SQL, it can only pull in one table per
>>> query.
>>> These cases can be solved using the datastream API. With it I can code
>>> pulling in/pushing out multiple table streams. But then "the configuration"
>>> is a much bigger effort, because it requires using java code. And that is a
>>> few hours vs few days case, an enormous difference.
>>>
>>> So in the end some questions:
>>> * Do you know how SQL could be extended to support handling such cases
>>> elegantly, with a single job in the end?
>>> * Or do you believe SQL should not be used for that case and we should
>>> come up with a different tool and configuration language? I.e. sth like
>>> Kafka Connect
>>> * Do you know of any other project that implements this idea?
>>>
>>> I definitely believe that this is a great use case for Flink to be an
>>> easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore
>>> there is a need for a solution for my case.
>>>
>>> Thanks for answering!
>>> Krzysztof
>>>
>>> [1] https://github.com/ververica/flink-cdc-connectors
>>>
>>

Reply via email to