Hello Johan, Could you list the following information in addition to your topology?
1. Your config settings if you have any overwrites, especially consumer's " poll.ms" and "max.poll.records", and "num.threads". 2. Your expected incoming data rate (messages per sec) at normal processing phase. Guozhang On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg <johan.genb...@gmail.com> wrote: > Hi, > > Thank you for responding so quickly. This is the topology. I've simplified > it a bit, but these are the steps it goes through, not sure if that is > helpful. I'll try to get some logs too in a bit. > > *KStream<Integer, Event> eventStream = builder.stream(* > * topic.keySerde(),* > * topic.valueSerde(),* > * topic.name <http://topic.name>());* > > *KStream<Integer, Event> enriched =* > * eventStream* > * .map(...)* > * .transformValues(MyStatefulProcessor::new, "store1")* > * .mapValues(new MyStatelessProcessor());* > * .through("topic")* > * .map(new MyStatelessProcessor2());* > * .through("topic2")* > * .transformValues(MyStatefulProcessor2::new, "store2")* > * .through("topic3")* > * .map(new MyStatelessProcessor3());* > * .through("topic4");* > > Store 1: > > *Stores.create("store1")* > * .withKeys(Serdes.String())* > * .withValues(Serdes.Long())* > * .inMemory()* > * .build();* > > Store 2: > > *Stores.create("store2")* > * .withKeys(Serdes.String())* > * .withValues(valueSerde) // a small json object, 4-5 properties* > * .persistent()* > * .build();* > > Thanks, > Johan > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <damian....@gmail.com> wrote: > > > Hi Johan, > > > > Do you have any logs? The state store restoration changed significantly > in > > 0.11.0.1. If you could get some logs at trace level, that would be > useful. > > Also if you could provide your topology (removing anything > > proprietary/sensitive). > > > > Thanks, > > Damian > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg <johan.genb...@gmail.com> > > wrote: > > > > > Hi, > > > > > > I'm upgrading a kafka streams application from 0.10.2.1 to 0.11.0.1, > > > running against a kafka cluster with version 0.10.2.1. The application > > uses > > > a couple of state stores. > > > > > > When stopping/starting the application prior to the upgrade (with > > 0.10.2.1 > > > client) on 2 instances, it was up and running in less than 30s to a > > minute > > > on all nodes. However, after client was upgraded to 0.11.0.1, when the > > > instances started (during some load), it took about 6 minutes for one > of > > > the nodes to reach "RUNNING" state, and the second one didn't get > there. > > > After 10 minutes I had to roll back. > > > > > > Is it expected that this initial rebalancing takes a little longer with > > > 0.11.0.1, and is there a way to configure or tweak the client, or > > otherwise > > > optimize this to make this go faster? > > > > > > Thanks, > > > Johan > > > > > > -- -- Guozhang