[ https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888941#comment-15888941 ]
Hamidreza Afzali commented on KAFKA-4789: ----------------------------------------- Thanks! > ProcessorTopologyTestDriver does not forward extracted timestamps to internal > topics > ------------------------------------------------------------------------------------ > > Key: KAFKA-4789 > URL: https://issues.apache.org/jira/browse/KAFKA-4789 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Hamidreza Afzali > Assignee: Hamidreza Afzali > Labels: unit-test > Fix For: 0.10.3.0 > > > *Problem:* > When using ProcessorTopologyTestDriver, the extracted timestamp is not > forwarded with the produced record to the internal topics. > *Example:* > {code} > object Topology1 { > def main(args: Array[String]): Unit = { > val inputTopic = "input" > val outputTopic = "output" > val stateStore = "count" > val inputs = Seq[(String, Integer)](("A@1450000000", 1), ("B@1450000000", > 2)) > val props = new Properties > props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > classOf[MyTimestampExtractor].getName) > val windowedStringSerde = Serdes.serdeFrom(new > WindowedSerializer(Serdes.String.serializer), > new WindowedDeserializer(Serdes.String.deserializer)) > val builder = new KStreamBuilder > builder.stream(Serdes.String, Serdes.Integer, inputTopic) > .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v)) > .groupByKey(Serdes.String, Serdes.Integer) > .count(TimeWindows.of(1000L), stateStore) > .to(windowedStringSerde, Serdes.Long, outputTopic) > val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), > builder, stateStore) > inputs.foreach { > case (key, value) => { > driver.process(inputTopic, key, value, Serdes.String.serializer, > Serdes.Integer.serializer) > val record = driver.readOutput(outputTopic, > Serdes.String.deserializer, Serdes.Long.deserializer) > println(record) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)