ConsoleConsumer by default uses String deserializer, but value in the
changelog is of type long. For output topic, the type in converted from
long to string though -- thus you can read the output topic without
problems.

For reading the changelog topic, you need to specify option

 --property
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

(hope I got the option right :))


-Matthias


On 6/7/17 6:40 AM, john cheng wrote:
> the right way to see changelog persistent by rocksdb is use ByteDeser, and
> then decode hex to string
> 
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> 
> for(ConsumerRecord<String, byte[]> record: consumerRecords) {
> 
>   print bytesToHexString(record.value())
> 
> }
> 
> public static String bytesToHexString(byte[] src){
>     StringBuilder stringBuilder = new StringBuilder("");
>     if (src == null || src.length <= 0) {
>         return null;
>     }
>     for (int i = 0; i < src.length; i++) {
>         int v = src[i] & 0xFF;
>         String hv = Integer.toHexString(v);
>         if (hv.length() < 2) {
>             stringBuilder.append(0);
>         }
>         stringBuilder.append(hv);
>     }
>     return stringBuilder.toString();
> }
> 
> output now collect
> 
> p:0,o:0,k:msg1,v:00000001
> p:0,o:1,k:msg3,v:00000001
> p:0,o:2,k:msg5,v:00000001
> p:0,o:3,k:msg1,v:00000002
> p:0,o:4,k:msg3,v:00000002
> p:1,o:0,k:msg2,v:00000001
> p:1,o:1,k:msg4,v:00000001
> p:1,o:2,k:msg2,v:00000002
> p:1,o:3,k:msg2,v:00000003
> 
> 
> 2017-06-07 18:42 GMT+08:00 john cheng <zqhxuy...@gmail.com>:
> 
>> I add some log on StoreChangeLog
>>
>> for (K k : this.dirty) {
>>     V v = getter.get(k);
>>     log.info("logChange key:{},value:{}", k, v);
>>     collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
>> keySerializer, valueSerializer);
>> }
>>
>> and found the print result is normal, just some byte:
>>
>> [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka] 
>> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>> [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2] 
>> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>>
>> but console-consumer still output kafka:
>>
>> so this may be problem of console-consume.
>>
>>
>> 2017-06-07 18:24 GMT+08:00 john cheng <zqhxuy...@gmail.com>:
>>
>>> I'm running WordCountProcessorDemo with Processor API. and change
>>> something below
>>> 1. config 1 stream-thread and 1 replicas
>>> 2. change inMemory() to persistent()
>>> MyKakfa version is 0.10.0.0. After running streaming application, I check
>>> msg output by console-consumer
>>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
>>> localhost:9092 --topic streams-input2
>>> msg1  # by console-producer, we can only produce message's value. so
>>> message produce to input topic will use roundrobbin partition
>>> msg2
>>> msg3
>>> msg4
>>> msg5
>>> msg6
>>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-input2 --property
>>> print.key=true --property key.separator=":" --from-beginning
>>> null:msg2  # key is null, value is what we produce above
>>> null:msg4
>>> null:msg6
>>> null:msg1
>>> null:msg3
>>> null:msg5
>>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-output2 --property
>>> print.key=true --property key.separator=":" --from-beginning
>>> msg2:1
>>> msg1:1
>>> msg1:1
>>> msg3:1
>>> msg2:1
>>> msg4:1
>>> msg1:1
>>> msg3:1
>>> msg5:1
>>> msg2:1
>>> msg4:1
>>> msg6:1  # due to log compaction, same key will be overwrite. this is ok...
>>> ➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog
>>> --property print.key=true --property key.separator=":" --from-beginning
>>> *msg2:*
>>> *msg4:*
>>> *msg6:*
>>> *msg1:*
>>> *msg3:*
>>> *msg5:*
>>> Everything is ok, except changelog-topic trouble me. Why it's value is
>>> empty?
>>>
>>> I have dig into source code, and summary the workflow :
>>> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore,
>>> no matter Memory or RocksDB, they both put into local storage, then append
>>> msg to StoreChangeLogger
>>> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange
>>> by passing ValueGetter, this getter will get value from local storage when
>>> logChange() operation happen
>>> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to
>>> changelog topic, here is  streams-wordcount-Counts-changelog
>>> 4. StoreChangeLogger use dirty and remove Set to swap between logChange()
>>> and add().
>>>
>>> for (K k : this.dirty) { // logChange() method flush dirty to changelog 
>>> topic
>>>     V v = getter.get(k);  // value getter will get value from local storage
>>>     collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), 
>>> keySerializer, valueSerializer);
>>> }
>>>
>>>
>>> The Only reason I can figure out why value is empty is when invoke
>>> KeyValueStore's delete method. But Actualy the application did't do it
>>>
>>> Anyone help me, Tks .
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to