the right way to see changelog persistent by rocksdb is use ByteDeser, and then decode hex to string
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); for(ConsumerRecord<String, byte[]> record: consumerRecords) { print bytesToHexString(record.value()) } public static String bytesToHexString(byte[] src){ StringBuilder stringBuilder = new StringBuilder(""); if (src == null || src.length <= 0) { return null; } for (int i = 0; i < src.length; i++) { int v = src[i] & 0xFF; String hv = Integer.toHexString(v); if (hv.length() < 2) { stringBuilder.append(0); } stringBuilder.append(hv); } return stringBuilder.toString(); } output now collect p:0,o:0,k:msg1,v:00000001 p:0,o:1,k:msg3,v:00000001 p:0,o:2,k:msg5,v:00000001 p:0,o:3,k:msg1,v:00000002 p:0,o:4,k:msg3,v:00000002 p:1,o:0,k:msg2,v:00000001 p:1,o:1,k:msg4,v:00000001 p:1,o:2,k:msg2,v:00000002 p:1,o:3,k:msg2,v:00000003 2017-06-07 18:42 GMT+08:00 john cheng <>: > I add some log on StoreChangeLog > > for (K k : this.dirty) { > V v = getter.get(k); >"logChange key:{},value:{}", k, v); > collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), > keySerializer, valueSerializer); > } > > and found the print result is normal, just some byte: > > [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] > (org.apache.kafka.streams.state.internals.StoreChangeLogger) > [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] > (org.apache.kafka.streams.state.internals.StoreChangeLogger) > > but console-consumer still output kafka: > > so this may be problem of console-consume. > > > 2017-06-07 18:24 GMT+08:00 john cheng <>: > >> I'm running WordCountProcessorDemo with Processor API. and change >> something below >> 1. config 1 stream-thread and 1 replicas >> 2. change inMemory() to persistent() >> MyKakfa version is After running streaming application, I check >> msg output by console-consumer >> ➜ kafka_2.10- bin/ --broker-list >> localhost:9092 --topic streams-input2 >> msg1 # by console-producer, we can only produce message's value. so >> message produce to input topic will use roundrobbin partition >> msg2 >> msg3 >> msg4 >> msg5 >> msg6 >> ➜ kafka_2.10- bin/ --new-consumer >> --bootstrap-server localhost:9092 --topic streams-input2 --property >> print.key=true --property key.separator=":" --from-beginning >> null:msg2 # key is null, value is what we produce above >> null:msg4 >> null:msg6 >> null:msg1 >> null:msg3 >> null:msg5 >> ➜ kafka_2.10- bin/ --new-consumer >> --bootstrap-server localhost:9092 --topic streams-output2 --property >> print.key=true --property key.separator=":" --from-beginning >> msg2:1 >> msg1:1 >> msg1:1 >> msg3:1 >> msg2:1 >> msg4:1 >> msg1:1 >> msg3:1 >> msg5:1 >> msg2:1 >> msg4:1 >> msg6:1 # due to log compaction, same key will be overwrite. this is ok... >> ➜ kafka_2.10- bin/ --new-consumer >> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog >> --property print.key=true --property key.separator=":" --from-beginning >> *msg2:* >> *msg4:* >> *msg6:* >> *msg1:* >> *msg3:* >> *msg5:* >> Everything is ok, except changelog-topic trouble me. Why it's value is >> empty? >> >> I have dig into source code, and summary the workflow : >> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, >> no matter Memory or RocksDB, they both put into local storage, then append >> msg to StoreChangeLogger >> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange >> by passing ValueGetter, this getter will get value from local storage when >> logChange() operation happen >> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to >> changelog topic, here is streams-wordcount-Counts-changelog >> 4. StoreChangeLogger use dirty and remove Set to swap between logChange() >> and add(). >> >> for (K k : this.dirty) { // logChange() method flush dirty to changelog topic >> V v = getter.get(k); // value getter will get value from local storage >> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), >> keySerializer, valueSerializer); >> } >> >> >> The Only reason I can figure out why value is empty is when invoke >> KeyValueStore's delete method. But Actualy the application did't do it >> >> Anyone help me, Tks . >> >> >