[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069509#comment-16069509 ]
ASF GitHub Bot commented on FLINK-6352: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124965642 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> } @Override + public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** + * Convert flink topic partition to kafka topic partition. + * @param flinkTopicPartitionMap + * @return + */ + private Map<TopicPartition, Long> convertFlinkToKafkaTopicPartition(Map<KafkaTopicPartition, Long> flinkTopicPartitionMap) { + Map<TopicPartition, Long> topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry<KafkaTopicPartition, Long> entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + + } + + /** + * Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. + * @param partitionTimesMap Kafka topic partition and timestamp + * @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka + */ + private Map<KafkaTopicPartition, Long> convertTimestampToOffset(Map<KafkaTopicPartition, Long> partitionTimesMap) { --- End diff -- I think we need to move this conversion logic to `KafkaConsumerThread`, otherwise we would be instantiating a KafkaConsumer just for the sake of fetching timestamp-based offsets. That's where the actual "`KafkaTopicPartitionStateSentinel` to actual offset" conversions take place. See `KafkaConsumerThread` lines 369 - 390 ``` // offsets in the state of new partitions may still be placeholder sentinel values if we are: // (1) starting fresh, // (2) checkpoint / savepoint state we were restored with had not completely // been replaced with actual offset values yet, or // (3) the partition was newly discovered after startup; // replace those with actual offsets, according to what the sentinel value represent. for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) { if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { consumerCallBridge.seekPartitionToBeginning(consumerTmp, newPartitionState.getKafkaPartitionHandle()); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) { consumerCallBridge.seekPartitionToEnd(consumerTmp, newPartitionState.getKafkaPartitionHandle()); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { // the KafkaConsumer by default will automatically seek the consumer position // to the committed group offset, so we do not need to do it. newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else { consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1); } } ``` > FlinkKafkaConsumer should support to use timestamp to set up start offset > ------------------------------------------------------------------------- > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Fang Yong > Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "yyyy-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)