ConsoleConsumer by default uses String deserializer, but value in the changelog is of type long. For output topic, the type in converted from long to string though -- thus you can read the output topic without problems.
For reading the changelog topic, you need to specify option
--property
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
(hope I got the option right :))
-Matthias
On 6/7/17 6:40 AM, john cheng wrote:
> the right way to see changelog persistent by rocksdb is use ByteDeser, and
> then decode hex to string
>
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>
> for(ConsumerRecord<String, byte[]> record: consumerRecords) {
>
> print bytesToHexString(record.value())
>
> }
>
> public static String bytesToHexString(byte[] src){
> StringBuilder stringBuilder = new StringBuilder("");
> if (src == null || src.length <= 0) {
> return null;
> }
> for (int i = 0; i < src.length; i++) {
> int v = src[i] & 0xFF;
> String hv = Integer.toHexString(v);
> if (hv.length() < 2) {
> stringBuilder.append(0);
> }
> stringBuilder.append(hv);
> }
> return stringBuilder.toString();
> }
>
> output now collect
>
> p:0,o:0,k:msg1,v:00000001
> p:0,o:1,k:msg3,v:00000001
> p:0,o:2,k:msg5,v:00000001
> p:0,o:3,k:msg1,v:00000002
> p:0,o:4,k:msg3,v:00000002
> p:1,o:0,k:msg2,v:00000001
> p:1,o:1,k:msg4,v:00000001
> p:1,o:2,k:msg2,v:00000002
> p:1,o:3,k:msg2,v:00000003
>
>
> 2017-06-07 18:42 GMT+08:00 john cheng <[email protected]>:
>
>> I add some log on StoreChangeLog
>>
>> for (K k : this.dirty) {
>> V v = getter.get(k);
>> log.info("logChange key:{},value:{}", k, v);
>> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v),
>> keySerializer, valueSerializer);
>> }
>>
>> and found the print result is normal, just some byte:
>>
>> [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka]
>> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>> [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2]
>> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>>
>> but console-consumer still output kafka:
>>
>> so this may be problem of console-consume.
>>
>>
>> 2017-06-07 18:24 GMT+08:00 john cheng <[email protected]>:
>>
>>> I'm running WordCountProcessorDemo with Processor API. and change
>>> something below
>>> 1. config 1 stream-thread and 1 replicas
>>> 2. change inMemory() to persistent()
>>> MyKakfa version is 0.10.0.0. After running streaming application, I check
>>> msg output by console-consumer
>>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
>>> localhost:9092 --topic streams-input2
>>> msg1 # by console-producer, we can only produce message's value. so
>>> message produce to input topic will use roundrobbin partition
>>> msg2
>>> msg3
>>> msg4
>>> msg5
>>> msg6
>>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-input2 --property
>>> print.key=true --property key.separator=":" --from-beginning
>>> null:msg2 # key is null, value is what we produce above
>>> null:msg4
>>> null:msg6
>>> null:msg1
>>> null:msg3
>>> null:msg5
>>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-output2 --property
>>> print.key=true --property key.separator=":" --from-beginning
>>> msg2:1
>>> msg1:1
>>> msg1:1
>>> msg3:1
>>> msg2:1
>>> msg4:1
>>> msg1:1
>>> msg3:1
>>> msg5:1
>>> msg2:1
>>> msg4:1
>>> msg6:1 # due to log compaction, same key will be overwrite. this is ok...
>>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>>> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog
>>> --property print.key=true --property key.separator=":" --from-beginning
>>> *msg2:*
>>> *msg4:*
>>> *msg6:*
>>> *msg1:*
>>> *msg3:*
>>> *msg5:*
>>> Everything is ok, except changelog-topic trouble me. Why it's value is
>>> empty?
>>>
>>> I have dig into source code, and summary the workflow :
>>> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore,
>>> no matter Memory or RocksDB, they both put into local storage, then append
>>> msg to StoreChangeLogger
>>> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange
>>> by passing ValueGetter, this getter will get value from local storage when
>>> logChange() operation happen
>>> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to
>>> changelog topic, here is streams-wordcount-Counts-changelog
>>> 4. StoreChangeLogger use dirty and remove Set to swap between logChange()
>>> and add().
>>>
>>> for (K k : this.dirty) { // logChange() method flush dirty to changelog
>>> topic
>>> V v = getter.get(k); // value getter will get value from local storage
>>> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v),
>>> keySerializer, valueSerializer);
>>> }
>>>
>>>
>>> The Only reason I can figure out why value is empty is when invoke
>>> KeyValueStore's delete method. But Actualy the application did't do it
>>>
>>> Anyone help me, Tks .
>>>
>>>
>>
>
signature.asc
Description: OpenPGP digital signature
