I add some log on StoreChangeLog for (K k : this.dirty) { V v = getter.get(k); log.info("logChange key:{},value:{}", k, v); collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); }
and found the print result is normal, just some byte: [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] (org.apache.kafka.streams.state.internals.StoreChangeLogger) [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] (org.apache.kafka.streams.state.internals.StoreChangeLogger) but console-consumer still output kafka: so this may be problem of console-consume. 2017-06-07 18:24 GMT+08:00 john cheng <zqhxuy...@gmail.com>: > I'm running WordCountProcessorDemo with Processor API. and change > something below > 1. config 1 stream-thread and 1 replicas > 2. change inMemory() to persistent() > MyKakfa version is 0.10.0.0. After running streaming application, I check > msg output by console-consumer > ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic streams-input2 > msg1 # by console-producer, we can only produce message's value. so > message produce to input topic will use roundrobbin partition > msg2 > msg3 > msg4 > msg5 > msg6 > ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer > --bootstrap-server localhost:9092 --topic streams-input2 --property > print.key=true --property key.separator=":" --from-beginning > null:msg2 # key is null, value is what we produce above > null:msg4 > null:msg6 > null:msg1 > null:msg3 > null:msg5 > ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer > --bootstrap-server localhost:9092 --topic streams-output2 --property > print.key=true --property key.separator=":" --from-beginning > msg2:1 > msg1:1 > msg1:1 > msg3:1 > msg2:1 > msg4:1 > msg1:1 > msg3:1 > msg5:1 > msg2:1 > msg4:1 > msg6:1 # due to log compaction, same key will be overwrite. this is ok... > ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer > --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog > --property print.key=true --property key.separator=":" --from-beginning > *msg2:* > *msg4:* > *msg6:* > *msg1:* > *msg3:* > *msg5:* > Everything is ok, except changelog-topic trouble me. Why it's value is > empty? > > I have dig into source code, and summary the workflow : > 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, no > matter Memory or RocksDB, they both put into local storage, then append msg > to StoreChangeLogger > 2. the key append to StoreChangeLogger first, and invoke maybeLogChange by > passing ValueGetter, this getter will get value from local storage when > logChange() operation happen > 3. when logChange() on StoreChangeLogger happen, send KeyValue message to > changelog topic, here is streams-wordcount-Counts-changelog > 4. StoreChangeLogger use dirty and remove Set to swap between logChange() > and add(). > > for (K k : this.dirty) { // logChange() method flush dirty to changelog topic > V v = getter.get(k); // value getter will get value from local storage > collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), > keySerializer, valueSerializer); > } > > > The Only reason I can figure out why value is empty is when invoke > KeyValueStore's delete method. But Actualy the application did't do it > > Anyone help me, Tks . > >