????????????????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2021??4??27??(??????) ????3:15
??????: "[email protected]"<[email protected]>;
????: "user-zh"<[email protected]>;
????: ??????flink cdc ????mysql binlog??????
streaming????????????????????reload????????????????????
????????????????datastreaming
API??????debezium????????????????????????debezium??????
public static Properties debeziumProperties(){
Properties properties = new Properties();
properties.setProperty(??xxxx??,??xxxx");
return properties;
}
SourceFunction<T> sourceFunction =
MySQLSource.<T>builder()
. . .
.debeziumProperties(debeziumProperties())
.build();
| |
MOBIN
|
|
[email protected]
|
??????????????????????
??2021??04??27?? 14:46????????????<[email protected]> ??????
hi all
??????????flink cdc??????????????????streaming
mode??????????binlog??????????????????????????????????????????????????mysql??RELOAD????????????????sql????????cdc????????????
debezium.snap.shot.locking.mode = none????????
????????streaming
mode??????????????????????????????????????????????????reload??????????????flink
cdc??????????????????????
Properties properties = new Properties();
properties.setProperty("debezium.snapshot.locking.mode", "none");
SourceFunction<String&gt; sourceFunction =
MySQLSource.<String&gt;builder()
&nbsp; &nbsp; &nbsp; &nbsp; .hostname("localhost")
&nbsp; &nbsp; &nbsp; &nbsp; .port(port)
&nbsp; &nbsp; &nbsp; &nbsp; .databaseList("database")&nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; .tableList("database.test")
&nbsp; &nbsp; &nbsp; &nbsp; .username(??user)
&nbsp; &nbsp; &nbsp; &nbsp; .password("password")
&nbsp; &nbsp; &nbsp; &nbsp; .debeziumProperties(properties)
&nbsp; &nbsp; &nbsp; &nbsp; .deserializer(new
StringDebeziumDeserializationSchema())
&nbsp; &nbsp; &nbsp; &nbsp; .build();
??.debeziumProperties(properties)????????????sql????????????????????????????????sourceFunction??????Streaming??????????????????????????????????????