I restart flink job via savepoint. command as following: cancel command:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ -yid application_1625497885855_698371 \ -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint \ 59cf6ccc83aa163bd1e0cd3304dfe06a print savepoint: hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 restart command: /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ -m yarn-cluster \ -yjm 4096 -ytm 4096 \ -ynm User_Click_Log_Split_All \ -yqu syh_offline \ -ys 2 \ -d \ -p 64 \ -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 \ -n \ -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午1:51写道: > Hi all, my flink job consume kafka topic A, and write to kafka topic B. > When i restart my flink job via savepoint, topic B have some duplicate > message. Any one can help me how to solve this problem? Thanks! > > My Versions: > Flink 1.12.4 > Kafka 2.0.1 > Java 1.8 > > Core code: > env.enableCheckpointing(300000); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > > tableEnv.createTemporaryView("data_table",dataDS); > String sql = "select * from data_table a inner join > hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id > = b.id" > Table table = tableEnv.sqlQuery(sql); > DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx); > > // Kafka producer parameter > Properties producerProps = new Properties(); > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstrapServers); > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory); > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000); > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > "1"); > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > > resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) > .setParallelism(sinkParallelism); >