[ 
https://issues.apache.org/jira/browse/KAFKA-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-4597:
----------------------------------

    Assignee: Ismael Juma

> Record metadata returned by producer doesn't consider log append time
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-4597
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4597
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 0.10.1.1
>            Reporter: Alex Fechner
>            Assignee: Ismael Juma
>             Fix For: 0.10.2.0
>
>
> Kafka topics might be configured to record timestamps of the messages 
> produced. There are two different timestamps which might be stored:
> # Record *create time*: The time the record is created by the client.
> # Log *append time*: The time the record has been added to the log by the 
> broker.
> The 
> [ProducerRecord|https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html]
>  docs state:
> {quote}
> In either of the cases above, the timestamp that has actually been used will 
> be returned to user in RecordMetadata
> {quote}
> However I found the *create time* used in both cases.
> The following class creates two topics, one configured to store *create 
> time*. The other used *log append time*. It produces 10 messages in each 
> topic and outputs the timestamps from the record meta data as well as those 
> fetched by a consumer client.
> {code:java}
> import kafka.admin.AdminUtils;
> import kafka.admin.RackAwareMode;
> import kafka.utils.ZKStringSerializer$;
> import kafka.utils.ZkUtils;
> import org.I0Itec.zkclient.ZkClient;
> import org.I0Itec.zkclient.ZkConnection;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Arrays;
> import java.util.Properties;
> import java.util.concurrent.ExecutionException;
> public class KafkaTimestampTest {
>     public static void main(String[] args) throws ExecutionException, 
> InterruptedException {
>         String ip = "127.0.0.1";
>         Properties producerProperties = new Properties();
>         producerProperties.put("bootstrap.servers", ip + ":9092");
>         producerProperties.put("acks", "all");
>         producerProperties.put("retries", 0);
>         producerProperties.put("batch.size", 16384);
>         producerProperties.put("linger.ms", 1);
>         producerProperties.put("buffer.memory", 33554432);
>         producerProperties.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>         producerProperties.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>         Properties consumerProperties = new Properties();
>         consumerProperties.put("bootstrap.servers", ip + ":9092");
>         consumerProperties.put("enable.auto.commit", "false");
>         consumerProperties.put("session.timeout.ms", "30000");
>         consumerProperties.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         consumerProperties.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         Producer<String, String> producer = new 
> KafkaProducer<>(producerProperties);
>         KafkaConsumer<String, String> consumer = new 
> KafkaConsumer<>(consumerProperties);
>         ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, 
> ZKStringSerializer$.MODULE$);
>         ZkConnection zkConnection = new ZkConnection(ip + ":2181");
>         ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
>         TopicPartition topicPartitionWithCreateTime = new 
> TopicPartition("test-topic-with-create-time", 0);
>         TopicPartition topicPartitionWithLogAppendTime = new 
> TopicPartition("test-topic-with-log-append-time", 0);
>         // create topic with create time
>         if (!AdminUtils.topicExists(zkUtils, 
> topicPartitionWithCreateTime.topic())) {
>             Properties topicProperties = new Properties();
>             topicProperties.put("message.timestamp.type", "CreateTime");
>             AdminUtils.createTopic(zkUtils, 
> topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, 
> RackAwareMode.Disabled$.MODULE$);
>         }
>         // create topic with log append time
>         if (!AdminUtils.topicExists(zkUtils, 
> topicPartitionWithLogAppendTime.topic())) {
>             Properties topicProperties = new Properties();
>             topicProperties.put("message.timestamp.type", "LogAppendTime");
>             AdminUtils.createTopic(zkUtils, 
> topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, 
> RackAwareMode.Disabled$.MODULE$);
>         }
>         consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, 
> topicPartitionWithCreateTime));
>         String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, 
> ConsumerRecordOffset: %s, ConsumerRecordTime: %s";
>         System.out.println(String.format("Create messages into topic %s ...", 
> topicPartitionWithCreateTime));
>         for (int i = 0; i < 10; i++) {
>             RecordMetadata recordMetadata = producer.send(new 
> ProducerRecord<>(topicPartitionWithCreateTime.topic(), 
> topicPartitionWithCreateTime.partition(), "", "message")).get();
>             consumer.seek(topicPartitionWithCreateTime, 
> recordMetadata.offset());
>             ConsumerRecord<String, String> consumerRecord =  
> consumer.poll(1000).records(topicPartitionWithCreateTime).get(0);
>             System.out.println(String.format(format, i + 1, 
> recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), 
> consumerRecord.timestamp()));
>         }
>         System.out.println(String.format("Create messages into topic %s...", 
> topicPartitionWithLogAppendTime));
>         for (int i = 0; i < 10; i++) {
>             RecordMetadata recordMetadata = producer.send(new 
> ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), 
> topicPartitionWithLogAppendTime.partition(), "", "message")).get();
>             consumer.seek(topicPartitionWithLogAppendTime, 
> recordMetadata.offset());
>             ConsumerRecord<String, String> consumerRecord =  
> consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0);
>             System.out.println(String.format(format, i + 1, 
> recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), 
> consumerRecord.timestamp()));
>         }
>         AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic());
>         AdminUtils.deleteTopic(zkUtils, 
> topicPartitionWithLogAppendTime.topic());
>     }
> }
> {code}
> The output shows that in case of *log append time* the timestamps differ.
> {code}
> Create messages into topic test-topic-with-create-time-0 ...
> #1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, 
> ConsumerRecordTime: 1483623773788
> #2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, 
> ConsumerRecordTime: 1483623774178
> #3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, 
> ConsumerRecordTime: 1483623774183
> #4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, 
> ConsumerRecordTime: 1483623774188
> #5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, 
> ConsumerRecordTime: 1483623774193
> #6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, 
> ConsumerRecordTime: 1483623774197
> #7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, 
> ConsumerRecordTime: 1483623774202
> #8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, 
> ConsumerRecordTime: 1483623774207
> #9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, 
> ConsumerRecordTime: 1483623774212
> #10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, 
> ConsumerRecordTime: 1483623774217
> Create messages into topic test-topic-with-log-append-time-0...
> #1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, 
> ConsumerRecordTime: 1483623774992
> #2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, 
> ConsumerRecordTime: 1483623774997
> #3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, 
> ConsumerRecordTime: 1483623775002
> #4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, 
> ConsumerRecordTime: 1483623775007
> #5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, 
> ConsumerRecordTime: 1483623775011
> #6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, 
> ConsumerRecordTime: 1483623775015
> #7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, 
> ConsumerRecordTime: 1483623775020
> #8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, 
> ConsumerRecordTime: 1483623775024
> #9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, 
> ConsumerRecordTime: 1483623775029
> #10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, 
> ConsumerRecordTime: 1483623775034
> {code}
>  I assume the timestamps in the record meta data represent the create time 
> but could not ensure that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to