public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
//        env.setParallelism(1);

env.enableCheckpointing(3000);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 创建目标 kafka映射表
tenv.executeSql(
" create table t_upsert_kafka(                 "
+ "    userid int primary key not enforced,        "
+ "    username string,                          "
+ "    age int,                             "
+ "    `partition` int                             "
+ " ) with (                                    "
+ "  'connector' = 'upsert-kafka',              "
+ "  'topic' = 'test02',                "
+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
+ "  'key.format' = 'json',                             "
+ "  'value.format' = 'json'                            "
+ " )                                                  "
);

tenv.executeSql("select * from t_upsert_kafka").print();

tenv.executeSql(
" CREATE TABLE t_kafka_connector (                       "
+ "    userid int ,        "
+ "    username string,                          "
+ "    age int,                             "
+ "    `partition` int                             "
+ " ) WITH (                                               "
+ "  'connector' = 'kafka',                                "
+ "  'topic' = 'test02',                             "
+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',      "
+ "  'properties.group.id' = 'testGroup1',                  "
+ "  'scan.startup.mode' = 'earliest-offset',           "
+ "  'format'='json'                               "
+ " )                                                   "

);

tenv.executeSql("select * from t_kafka_connector").print();

env.execute();





t_upsert_kafka 消费不到   t_kafka_connector可以消费到











在 2022-10-31 09:43:49,"Shengkai Fang" <fskm...@gmail.com> 写道:
>hi,
>
>看不到的图片。能不能直接展示文字或者用图床工具?
>
>Best,
>Shengkai
>
>左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
>
>> upsert kafka作为source时,消费不到kafka中的数据
>> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
>> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
>>

回复