Hi,
After some investigation, I found a simpler way to satisfy the demand, use
cdc connector[1] which supports reading database snapshots and continues to
read binlogs in the database without deploying kafka and debezium.

Best regards,
JING ZHANG

[1] https://github.com/ververica/flink-cdc-connectors

JING ZHANG <beyond1...@gmail.com> 于2021年6月5日周六 下午2:36写道:

> Hi,
> Although JDBC connector could not read changlog from Database, however
> there are already connectors which could satisfy your demands. You could
> use Maxwell <https://maxwells-daemon.io/>[1], Canal
> <https://github.com/alibaba/canal/wiki> [2],Debezium
> <https://debezium.io/>[3] CDC tools to capture changes in databases,
> please have a try.
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/maxwell/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/
>
> Best regards,
> JING ZHANG
>
> 1095193...@qq.com <1095193...@qq.com> 于2021年6月5日周六 下午12:56写道:
>
>> Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join.
>> BTW, would we support read changelog for JDBC source when it works as right
>> stream of a regular join in future?
>>
>> ------------------------------
>> 1095193...@qq.com
>>
>>
>> *From:* JING ZHANG <beyond1...@gmail.com>
>> *Date:* 2021-06-04 18:32
>> *To:* Yun Gao <yungao...@aliyun.com>
>> *CC:* 1095193...@qq.com; user <user@flink.apache.org>
>> *Subject:* Re: Flink sql regular join not working as expect.
>> Hi,
>> JDBC source only does a snapshot and sends all datas in the snapshot to
>> downstream when it works as a right stream of a regular join, it could not
>> produce a changlog stream.
>> After you update the field 'target'  from '56.32.15.55:8080
>> <http://56.3215.55:8080/>' to '56.32.15.54:8080', JDBC source would not
>> send new data to downstream.
>>
>> You could try to use Upsert kafka [1] as right side of the regular join
>> and set `source` as primary key.
>>
>> BTW, if use Processing TIme Temporal Join[2] in your case, you could
>> always join the latest version of dimension table, but updates on dimension
>> table would not trigger join because it only waits for look up by keys.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>>
>> Best regards,
>> JING ZHANG
>>
>>
>> Yun Gao <yungao...@aliyun.com> 于2021年6月4日周五 下午5:07写道:
>>
>>> Hi,
>>>
>>> I'm not the expert for the table/sql, but it seems to me that for
>>> regular joins, Flink would not re-read the dimension
>>> table after it has read it fully for the first time. If you want to
>>> always join the records with the latest version of
>>> dimension table, you may need to use the temporal joins [1].
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> [1]
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>>>
>>>
>>> ------------------------------------------------------------------
>>> From:1095193...@qq.com <1095193...@qq.com>
>>> Send Time:2021 Jun. 4 (Fri.) 16:45
>>> To:user <user@flink.apache.org>
>>> Subject:Flink sql regular join not working as expect.
>>>
>>> Hi
>>>    I am working on joining a Kafka stream with a Postgres Dimension
>>> table.  Accoring to:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
>>>  *"**Regular joins are the most generic type of join in which any new
>>> record, or changes to either side of the join, are visible and affect the
>>> entirety of the join result."*
>>>    However, in my test, change record in dimenstion table will not
>>> affect the result of the join.  My test steps:
>>>    1. create Kafka table sql
>>>       CREATE TABLE test1 (  source String )  WITH (  'connector' =
>>> 'kafka',   'topic' = 'test' ...)
>>>    2.create dimension table sql
>>>      CREATE TABLE test2 (source String, target String)  WITH  (
>>> 'connector' = 'jdbc'... )
>>>      Prepared 1 record in dimenion table:
>>>      source                      |   target
>>>   172.16.1.109:8080       | 56.32.15.55:8080
>>>    3. regular join sql
>>>        select test1.source, test2.target from test1 join test2 on
>>> test1.source = test2.source
>>>    4. feed data into Kafka
>>>       {"source":"172.16.1.109:8080"}
>>>       Flink could output result as expect:  +I[172.16.1.109:8080,
>>> 56.32.15.55:8080]
>>>    5. change field 'target'  from '56.32.15.55:8080' to '
>>> 56.32.15.54:8080' in dimension table:
>>>       source                      |   target
>>>     172.16.1.109:8080 56.32.15.54:8080
>>>    6. feed data into Kafka
>>>       {"source":"172.16.1.109:8080"}
>>>       Flink still output result as not affected by changes to dimension
>>> table:  +I[172.16.1.109:8080, 56.32.15.55:8080]
>>>       Expect result:                  +I[172.16.1.109:8080,
>>> 56.32.15.54:8080]
>>>     Could you give me some suggestions why regualar join result not be
>>> affected by changes to dimension table in mytest? Appreciation.
>>>
>>> ------------------------------
>>> 1095193...@qq.com
>>>
>>>
>>>

Reply via email to