[ https://issues.apache.org/jira/browse/KAFKA-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma reassigned KAFKA-4597: ---------------------------------- Assignee: Ismael Juma > Record metadata returned by producer doesn't consider log append time > --------------------------------------------------------------------- > > Key: KAFKA-4597 > URL: https://issues.apache.org/jira/browse/KAFKA-4597 > Project: Kafka > Issue Type: Bug > Components: clients, producer > Affects Versions: 0.10.1.1 > Reporter: Alex Fechner > Assignee: Ismael Juma > Fix For: 0.10.2.0 > > > Kafka topics might be configured to record timestamps of the messages > produced. There are two different timestamps which might be stored: > # Record *create time*: The time the record is created by the client. > # Log *append time*: The time the record has been added to the log by the > broker. > The > [ProducerRecord|https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html] > docs state: > {quote} > In either of the cases above, the timestamp that has actually been used will > be returned to user in RecordMetadata > {quote} > However I found the *create time* used in both cases. > The following class creates two topics, one configured to store *create > time*. The other used *log append time*. It produces 10 messages in each > topic and outputs the timestamps from the record meta data as well as those > fetched by a consumer client. > {code:java} > import kafka.admin.AdminUtils; > import kafka.admin.RackAwareMode; > import kafka.utils.ZKStringSerializer$; > import kafka.utils.ZkUtils; > import org.I0Itec.zkclient.ZkClient; > import org.I0Itec.zkclient.ZkConnection; > import org.apache.kafka.clients.consumer.ConsumerRecord; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.clients.producer.RecordMetadata; > import org.apache.kafka.common.TopicPartition; > import java.util.Arrays; > import java.util.Properties; > import java.util.concurrent.ExecutionException; > public class KafkaTimestampTest { > public static void main(String[] args) throws ExecutionException, > InterruptedException { > String ip = "127.0.0.1"; > Properties producerProperties = new Properties(); > producerProperties.put("bootstrap.servers", ip + ":9092"); > producerProperties.put("acks", "all"); > producerProperties.put("retries", 0); > producerProperties.put("batch.size", 16384); > producerProperties.put("linger.ms", 1); > producerProperties.put("buffer.memory", 33554432); > producerProperties.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > producerProperties.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > Properties consumerProperties = new Properties(); > consumerProperties.put("bootstrap.servers", ip + ":9092"); > consumerProperties.put("enable.auto.commit", "false"); > consumerProperties.put("session.timeout.ms", "30000"); > consumerProperties.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > consumerProperties.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > Producer<String, String> producer = new > KafkaProducer<>(producerProperties); > KafkaConsumer<String, String> consumer = new > KafkaConsumer<>(consumerProperties); > ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, > ZKStringSerializer$.MODULE$); > ZkConnection zkConnection = new ZkConnection(ip + ":2181"); > ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); > TopicPartition topicPartitionWithCreateTime = new > TopicPartition("test-topic-with-create-time", 0); > TopicPartition topicPartitionWithLogAppendTime = new > TopicPartition("test-topic-with-log-append-time", 0); > // create topic with create time > if (!AdminUtils.topicExists(zkUtils, > topicPartitionWithCreateTime.topic())) { > Properties topicProperties = new Properties(); > topicProperties.put("message.timestamp.type", "CreateTime"); > AdminUtils.createTopic(zkUtils, > topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, > RackAwareMode.Disabled$.MODULE$); > } > // create topic with log append time > if (!AdminUtils.topicExists(zkUtils, > topicPartitionWithLogAppendTime.topic())) { > Properties topicProperties = new Properties(); > topicProperties.put("message.timestamp.type", "LogAppendTime"); > AdminUtils.createTopic(zkUtils, > topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, > RackAwareMode.Disabled$.MODULE$); > } > consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, > topicPartitionWithCreateTime)); > String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, > ConsumerRecordOffset: %s, ConsumerRecordTime: %s"; > System.out.println(String.format("Create messages into topic %s ...", > topicPartitionWithCreateTime)); > for (int i = 0; i < 10; i++) { > RecordMetadata recordMetadata = producer.send(new > ProducerRecord<>(topicPartitionWithCreateTime.topic(), > topicPartitionWithCreateTime.partition(), "", "message")).get(); > consumer.seek(topicPartitionWithCreateTime, > recordMetadata.offset()); > ConsumerRecord<String, String> consumerRecord = > consumer.poll(1000).records(topicPartitionWithCreateTime).get(0); > System.out.println(String.format(format, i + 1, > recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), > consumerRecord.timestamp())); > } > System.out.println(String.format("Create messages into topic %s...", > topicPartitionWithLogAppendTime)); > for (int i = 0; i < 10; i++) { > RecordMetadata recordMetadata = producer.send(new > ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), > topicPartitionWithLogAppendTime.partition(), "", "message")).get(); > consumer.seek(topicPartitionWithLogAppendTime, > recordMetadata.offset()); > ConsumerRecord<String, String> consumerRecord = > consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0); > System.out.println(String.format(format, i + 1, > recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), > consumerRecord.timestamp())); > } > AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic()); > AdminUtils.deleteTopic(zkUtils, > topicPartitionWithLogAppendTime.topic()); > } > } > {code} > The output shows that in case of *log append time* the timestamps differ. > {code} > Create messages into topic test-topic-with-create-time-0 ... > #1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, > ConsumerRecordTime: 1483623773788 > #2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, > ConsumerRecordTime: 1483623774178 > #3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, > ConsumerRecordTime: 1483623774183 > #4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, > ConsumerRecordTime: 1483623774188 > #5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, > ConsumerRecordTime: 1483623774193 > #6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, > ConsumerRecordTime: 1483623774197 > #7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, > ConsumerRecordTime: 1483623774202 > #8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, > ConsumerRecordTime: 1483623774207 > #9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, > ConsumerRecordTime: 1483623774212 > #10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, > ConsumerRecordTime: 1483623774217 > Create messages into topic test-topic-with-log-append-time-0... > #1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, > ConsumerRecordTime: 1483623774992 > #2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, > ConsumerRecordTime: 1483623774997 > #3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, > ConsumerRecordTime: 1483623775002 > #4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, > ConsumerRecordTime: 1483623775007 > #5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, > ConsumerRecordTime: 1483623775011 > #6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, > ConsumerRecordTime: 1483623775015 > #7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, > ConsumerRecordTime: 1483623775020 > #8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, > ConsumerRecordTime: 1483623775024 > #9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, > ConsumerRecordTime: 1483623775029 > #10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, > ConsumerRecordTime: 1483623775034 > {code} > I assume the timestamps in the record meta data represent the create time > but could not ensure that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)