[ https://issues.apache.org/jira/browse/FLINK-17638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chenchuangchuang updated FLINK-17638: ------------------------------------- Summary: FlinkKafkaConsumerBase restore from empty state will be set consume from earliest forced (was: FlinkKafkaConsumerBase restore from empty state will be set consum from earliest forced) > FlinkKafkaConsumerBase restore from empty state will be set consume from > earliest forced > ---------------------------------------------------------------------------------------- > > Key: FLINK-17638 > URL: https://issues.apache.org/jira/browse/FLINK-17638 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.9.0, 1.9.3, 1.10.0 > Environment: Flink 1.9.0 > kafka 1.1.0 > jdk 1.8 > Reporter: chenchuangchuang > Priority: Major > > my work target and data is like this : > # i need count the number of post per user create last 30 days in my system > # the total and realtime data is in MYSQL > # i can get increment MYSQL binlog from kafka-1.1.1 ( it just store the > last 7 days binlog), the topic name is "binlog_post_topic" > # so , i have to combine the MYSQL data and the binlog data > > i do it in this way: > # first , i carry a snapshot of MYSQL data to kafka topic in order of > create_time ( topic name is "init-post-topic"), and consume from kafka topic > "init-post-topic" as flink data-stream with the SlidingEventTimeWindows > # second, after the task do all the data in the topic "init-post-topic" , i > create a save point for the task , call the save point save-point-a > # third, i modify my code , > ## the data source is "binlog_post_topic" topic of kafka , > ## other operotor will not change, > ## and the "binlog_post_topic" is setted consuming from special timestamp > (when the snapshot of MYSQL create ) > # forth, i restart my task from save-point-a > but i find the kafka consumer for the "binlog_post_topic" do not consume data > from the timestamp i setted, but from the earlist, i find the log in the > task manager > {code:java} > //代码占位符 > 2020-05-11 17:20:47,228 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 restored state: {}. > ... > 2020-05-12 20:14:52,641 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading 1 partitions with offsets in restored > state: {KafkaTopicPartition{topic='binlog-kk_social-post', > partition=0}=-915623761775} > 2020-05-11 17:20:47,414 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 creating fetcher with offsets > {KafkaTopicPartition{topic='binlog-kk_social-post', > partition=0}=-915623761775}. > {code} > i guess this may be caused by the FlinkKafkaConsumerBase > then i find code like this > in the method FlinkKafkaConsumerBase.initializeState() > {code:java} > //代码占位符 > if (context.isRestored() && !restoredFromOldState) { > restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); > ....{code} > this code mean that if a task is restart from the save point ,that > restoredState will not be null, at least be an empty TreeMap; > and in FlinkKafkaConsumerBase.open() > {code:java} > //代码占位符 > if (restoredState != null) { > for (KafkaTopicPartition partition : allPartitions) { > if (!restoredState.containsKey(partition)) { > restoredState.put(partition, > KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); > } > } > {code} > in this place will init the consumer , if a task is restart from a save-point > , restoredState at least is an empty TreeMap, then in this code , the > consumer will be setted consume from > KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET > i change this code like this > {code:java} > //代码占位符 > if (restoredState != null && !restoredState.isEmpty()) { > .... > {code} > > and this work well for me . > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)