I restart flink job via savepoint. command as following:

cancel command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
-yid application_1625497885855_698371 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
59cf6ccc83aa163bd1e0cd3304dfe06a

print savepoint:

hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494


restart command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
-m yarn-cluster \
-yjm 4096 -ytm 4096 \
-ynm User_Click_Log_Split_All \
-yqu syh_offline \
-ys 2 \
-d \
-p 64 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
\
-n \
-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar

Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午1:51写道:

> Hi all, my flink job consume kafka topic A, and write to kafka topic B.
> When i restart my flink job via savepoint, topic B have some duplicate
> message. Any one can help me how to solve this problem? Thanks!
>
> My Versions:
> Flink 1.12.4
> Kafka 2.0.1
> Java 1.8
>
> Core code:
> env.enableCheckpointing(300000);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
>
> tableEnv.createTemporaryView("data_table",dataDS);
> String sql = "select * from data_table a inner join
> hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id
> = b.id"
> Table table = tableEnv.sqlQuery(sql);
> DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);
>
> // Kafka producer parameter
> Properties producerProps = new Properties();
> producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapServers);
> producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
> producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000);
> producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> "1");
> producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
>
> resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new
> JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
>                 .setParallelism(sinkParallelism);
>

Reply via email to