Kafka stream error about message format

2018-04-18 Thread sy.pan
Hi, All: I have upgraded my Kafka cluster from 0.10.2 to 1.1 recently. After rolling upgrade, the broker version related configuration is : inter.broker.protocol.version = 1.1 log.message.format.version = 0.10.2 I keep the log message format as low version because not all clients could upgrade

Re: How to set result value Serdes Class in Kafka stream join

2017-11-16 Thread sy.pan
hat operation. > > Thanks, > Damian > > On Thu, 16 Nov 2017 at 10:53 sy.pan wrote: > >> Hi, all: >> >> Recently I have read kafka streams join document( >> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl >> < &g

How to set result value Serdes Class in Kafka stream join

2017-11-16 Thread sy.pan
Hi, all: Recently I have read kafka streams join document(https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl ). The sample code is pasted below: import java.util.concurrent.TimeUni

Re: consume ***-changelog topic encounter IllegalArgumentException: Window startMs time cannot be negative

2017-06-22 Thread sy.pan
eams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java > > Thanks, > Damian > > On Thu, 22 Jun 2017 at 15:23 sy.pan wrote: > >> I explicitly call KTable.to(Serde>, Serdes.Long(), >> String topic), >> >> save the same data to

Re: consume ***-changelog topic encounter IllegalArgumentException: Window startMs time cannot be negative

2017-06-22 Thread sy.pan
I explicitly call KTable.to(Serde>, Serdes.Long(), String topic), save the same data to another topic(manually created by myself), then the excp is gone. so the **-changelog internal topic has special key format ? (even the key type is same = windowed )

consume ***-changelog topic encounter IllegalArgumentException: Window startMs time cannot be negative

2017-06-22 Thread sy.pan
Hi: when call KGroupedStream.count(Windows windows , String storeName ) storeName-changelog is auto created as internal topic, and key type : windowed , value type: Long I try to consume from the internal storeName-changelog, code sample like: final Deserializer> windowedDeserializer = new

Re: How to recover from out of sync

2015-08-06 Thread sy.pan
the replicas all in isr. I don’t know why, could someone explain this ? Thank you > 在 2015年8月6日,18:47,sy.pan 写道: > > Hi guys: > > we have used kafka-0.8.1 under three machines (broker 0,broker 1,broker > 3). After running several months, so

How to recover from out of sync

2015-08-06 Thread sy.pan
Hi guys: we have used kafka-0.8.1 under three machines (broker 0,broker 1,broker 3). After running several months, some partions is out of sync; how to recover from this situation ? topic desc: Topic:analyze PartitionCount:8ReplicationFactor:3 Configs: Topic: anal

Re: How replicas catch up the leader

2015-03-11 Thread sy.pan
on":1,"partitions":[{"topic":"ad_click_sts","partition":3,"replicas":[0,1]}]} The partition has lost sync replication in practice : Topic: ad_click_sts Partition: 3Leader: 0 Replicas: 0,1 Isr: 0 Regards sy.pan > 在 2015年3

Re: How replicas catch up the leader

2015-03-10 Thread sy.pan
5-03-11 11:00:40,086] INFO Partition [ad_click_sts,4] on broker 1: Cached zkVersion [564] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) >>>>>>>>>>>>>>>>>> finally, I had to restart the kafka node and the

Re: How replicas catch up the leader

2015-03-09 Thread sy.pan
per, skip updating ISR (kafka.cluster.Partition) [2015-03-09 21:06:05,772] INFO Partition [ad_click_sts,2] on broker 1: Shrinking ISR for partition [ad_click_sts,2] from 1,0 to 1 (kafka.cluster.Partition) How to fix this Isr problem ? Is there some command can be run ? Regards sy.pan