public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// env.setParallelism(1);
env.enableCheckpointing(3000);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 创建目标 kafka映射表
tenv.executeSql(
" create table t_upsert_kafka( "
+ " userid int primary key not enforced, "
+ " username string, "
+ " age int, "
+ " `partition` int "
+ " ) with ( "
+ " 'connector' = 'upsert-kafka', "
+ " 'topic' = 'test02', "
+ " 'properties.bootstrap.servers' = '192.168.0.82:9092', "
+ " 'key.format' = 'json', "
+ " 'value.format' = 'json' "
+ " ) "
);
tenv.executeSql("select * from t_upsert_kafka").print();
tenv.executeSql(
" CREATE TABLE t_kafka_connector ( "
+ " userid int , "
+ " username string, "
+ " age int, "
+ " `partition` int "
+ " ) WITH ( "
+ " 'connector' = 'kafka', "
+ " 'topic' = 'test02', "
+ " 'properties.bootstrap.servers' = '192.168.0.82:9092', "
+ " 'properties.group.id' = 'testGroup1', "
+ " 'scan.startup.mode' = 'earliest-offset', "
+ " 'format'='json' "
+ " ) "
);
tenv.executeSql("select * from t_kafka_connector").print();
env.execute();
t_upsert_kafka 消费不到 t_kafka_connector可以消费到
在 2022-10-31 09:43:49,"Shengkai Fang" <fskm...@gmail.com> 写道:
>hi,
>
>看不到的图片。能不能直接展示文字或者用图床工具?
>
>Best,
>Shengkai
>
>左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
>
>> upsert kafka作为source时,消费不到kafka中的数据
>> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
>> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
>>