下游如何发现重复数据的,下游的isolation.level是不是read_committed
在 2021-08-02 18:14:27,"Jim Chen" <[email protected]> 写道: >Hi 刘建刚, >我使用了stop with savepoint,但是还是发现,下游有重复数据。 >停止命令: >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ >-yid application_1625497885855_703064 \ >-p >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >\ >-d 55e7ebb6fa38faaba61b4b9a7cd89827 > >重启命令: >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >-m yarn-cluster \ >-yjm 4096 -ytm 4096 \ >-ynm User_Click_Log_Split_All \ >-yqu syh_offline \ >-ys 2 \ >-d \ >-p 64 \ >-s >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 >\ >-n \ >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > > >刘建刚 <[email protected]> 于2021年8月2日周一 下午3:49写道: > >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ >> >> Jim Chen <[email protected]> 于2021年8月2日周一 下午2:33写道: >> >> > 我是通过savepoint的方式重启的,命令如下: >> > >> > cancel command: >> > >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ >> > -yid application_1625497885855_698371 \ >> > -s >> > >> > >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >> > \ >> > 59cf6ccc83aa163bd1e0cd3304dfe06a >> > >> > print savepoint: >> > >> > >> > >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> > >> > >> > restart command: >> > >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >> > -m yarn-cluster \ >> > -yjm 4096 -ytm 4096 \ >> > -ynm User_Click_Log_Split_All \ >> > -yqu syh_offline \ >> > -ys 2 \ >> > -d \ >> > -p 64 \ >> > -s >> > >> > >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> > \ >> > -n \ >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >> > >> > >> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar >> > >> > Jim Chen <[email protected]> 于2021年8月2日周一 下午2:01写道: >> > >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! >> > > >> > > My Versions >> > > Flink 1.12.4 >> > > Kafka 2.0.1 >> > > Java 1.8 >> > > >> > > Core code: >> > > >> > > env.enableCheckpointing(300000); >> > > >> > > >> > >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> > > >> > > >> > >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> > > >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); >> > > >> > > tableEnv.createTemporaryView("data_table",dataDS); >> > > String sql = "select * from data_table a inner join >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on >> > a.id >> > > = b.id" >> > > Table table = tableEnv.sqlQuery(sql); >> > > DataStream resultDS = tableEnv.toAppendStream(table, >> Row.class).map(xx); >> > > >> > > // Kafka producer parameter >> > > Properties producerProps = new Properties(); >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >> > > bootstrapServers); >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, >> > kafkaBufferMemory); >> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); >> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); >> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000); >> > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, >> > > "1"); >> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); >> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); >> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); >> > > >> > > resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new >> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), >> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) >> > > .setParallelism(sinkParallelism); >> > > >> > >>
