ConsoleConsumer by default uses String deserializer, but value in the changelog is of type long. For output topic, the type in converted from long to string though -- thus you can read the output topic without problems.
For reading the changelog topic, you need to specify option --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer (hope I got the option right :)) -Matthias On 6/7/17 6:40 AM, john cheng wrote: > the right way to see changelog persistent by rocksdb is use ByteDeser, and > then decode hex to string > > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > > for(ConsumerRecord<String, byte[]> record: consumerRecords) { > > print bytesToHexString(record.value()) > > } > > public static String bytesToHexString(byte[] src){ > StringBuilder stringBuilder = new StringBuilder(""); > if (src == null || src.length <= 0) { > return null; > } > for (int i = 0; i < src.length; i++) { > int v = src[i] & 0xFF; > String hv = Integer.toHexString(v); > if (hv.length() < 2) { > stringBuilder.append(0); > } > stringBuilder.append(hv); > } > return stringBuilder.toString(); > } > > output now collect > > p:0,o:0,k:msg1,v:00000001 > p:0,o:1,k:msg3,v:00000001 > p:0,o:2,k:msg5,v:00000001 > p:0,o:3,k:msg1,v:00000002 > p:0,o:4,k:msg3,v:00000002 > p:1,o:0,k:msg2,v:00000001 > p:1,o:1,k:msg4,v:00000001 > p:1,o:2,k:msg2,v:00000002 > p:1,o:3,k:msg2,v:00000003 > > > 2017-06-07 18:42 GMT+08:00 john cheng <zqhxuy...@gmail.com>: > >> I add some log on StoreChangeLog >> >> for (K k : this.dirty) { >> V v = getter.get(k); >> log.info("logChange key:{},value:{}", k, v); >> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), >> keySerializer, valueSerializer); >> } >> >> and found the print result is normal, just some byte: >> >> [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] >> (org.apache.kafka.streams.state.internals.StoreChangeLogger) >> [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] >> (org.apache.kafka.streams.state.internals.StoreChangeLogger) >> >> but console-consumer still output kafka: >> >> so this may be problem of console-consume. >> >> >> 2017-06-07 18:24 GMT+08:00 john cheng <zqhxuy...@gmail.com>: >> >>> I'm running WordCountProcessorDemo with Processor API. and change >>> something below >>> 1. config 1 stream-thread and 1 replicas >>> 2. change inMemory() to persistent() >>> MyKakfa version is 0.10.0.0. After running streaming application, I check >>> msg output by console-consumer >>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list >>> localhost:9092 --topic streams-input2 >>> msg1 # by console-producer, we can only produce message's value. so >>> message produce to input topic will use roundrobbin partition >>> msg2 >>> msg3 >>> msg4 >>> msg5 >>> msg6 >>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >>> --bootstrap-server localhost:9092 --topic streams-input2 --property >>> print.key=true --property key.separator=":" --from-beginning >>> null:msg2 # key is null, value is what we produce above >>> null:msg4 >>> null:msg6 >>> null:msg1 >>> null:msg3 >>> null:msg5 >>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >>> --bootstrap-server localhost:9092 --topic streams-output2 --property >>> print.key=true --property key.separator=":" --from-beginning >>> msg2:1 >>> msg1:1 >>> msg1:1 >>> msg3:1 >>> msg2:1 >>> msg4:1 >>> msg1:1 >>> msg3:1 >>> msg5:1 >>> msg2:1 >>> msg4:1 >>> msg6:1 # due to log compaction, same key will be overwrite. this is ok... >>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer >>> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog >>> --property print.key=true --property key.separator=":" --from-beginning >>> *msg2:* >>> *msg4:* >>> *msg6:* >>> *msg1:* >>> *msg3:* >>> *msg5:* >>> Everything is ok, except changelog-topic trouble me. Why it's value is >>> empty? >>> >>> I have dig into source code, and summary the workflow : >>> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, >>> no matter Memory or RocksDB, they both put into local storage, then append >>> msg to StoreChangeLogger >>> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange >>> by passing ValueGetter, this getter will get value from local storage when >>> logChange() operation happen >>> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to >>> changelog topic, here is streams-wordcount-Counts-changelog >>> 4. StoreChangeLogger use dirty and remove Set to swap between logChange() >>> and add(). >>> >>> for (K k : this.dirty) { // logChange() method flush dirty to changelog >>> topic >>> V v = getter.get(k); // value getter will get value from local storage >>> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), >>> keySerializer, valueSerializer); >>> } >>> >>> >>> The Only reason I can figure out why value is empty is when invoke >>> KeyValueStore's delete method. But Actualy the application did't do it >>> >>> Anyone help me, Tks . >>> >>> >> >
signature.asc
Description: OpenPGP digital signature