Hello Johan,

Could you list the following information in addition to your topology?

1. Your config settings if you have any overwrites, especially consumer's "
poll.ms" and "max.poll.records", and "num.threads".

2. Your expected incoming data rate (messages per sec) at normal processing
phase.



Guozhang


On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg <johan.genb...@gmail.com>
wrote:

> Hi,
>
> Thank you for responding so quickly. This is the topology. I've simplified
> it a bit, but these are the steps it goes through, not sure if that is
> helpful. I'll try to get some logs too in a bit.
>
> *KStream<Integer, Event> eventStream = builder.stream(*
> *  topic.keySerde(),*
> *  topic.valueSerde(),*
> *  topic.name <http://topic.name>());*
>
> *KStream<Integer, Event> enriched =*
> *    eventStream*
> *        .map(...)*
> *        .transformValues(MyStatefulProcessor::new, "store1")*
> *        .mapValues(new MyStatelessProcessor());*
> *        .through("topic")*
> *        .map(new MyStatelessProcessor2());*
> *        .through("topic2")*
> *        .transformValues(MyStatefulProcessor2::new, "store2")*
> *        .through("topic3")*
> *        .map(new MyStatelessProcessor3());*
> *        .through("topic4");*
>
> Store 1:
>
> *Stores.create("store1")*
> *    .withKeys(Serdes.String())*
> *    .withValues(Serdes.Long())*
> *    .inMemory()*
> *    .build();*
>
> Store 2:
>
> *Stores.create("store2")*
> *    .withKeys(Serdes.String())*
> *    .withValues(valueSerde) // a small json object, 4-5 properties*
> *    .persistent()*
> *    .build();*
>
> Thanks,
> Johan
>
> On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <damian....@gmail.com> wrote:
>
> > Hi Johan,
> >
> > Do you have any logs? The state store restoration changed significantly
> in
> > 0.11.0.1. If you could get some logs at trace level, that would be
> useful.
> > Also if you could provide your topology (removing anything
> > proprietary/sensitive).
> >
> > Thanks,
> > Damian
> >
> > On Tue, 17 Oct 2017 at 05:55 Johan Genberg <johan.genb...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I'm upgrading a kafka streams application from 0.10.2.1 to 0.11.0.1,
> > > running against a kafka cluster with version 0.10.2.1. The application
> > uses
> > > a couple of state stores.
> > >
> > > When stopping/starting the application prior to the upgrade (with
> > 0.10.2.1
> > > client) on 2 instances, it was up and running in less than 30s to a
> > minute
> > > on all nodes. However, after client was upgraded to 0.11.0.1, when the
> > > instances started (during some load), it took about 6 minutes for one
> of
> > > the nodes to reach "RUNNING" state, and the second one didn't get
> there.
> > > After 10 minutes I had to roll back.
> > >
> > > Is it expected that this initial rebalancing takes a little longer with
> > > 0.11.0.1, and is there a way to configure or tweak the client, or
> > otherwise
> > > optimize this to make this go faster?
> > >
> > > Thanks,
> > > Johan
> > >
> >
>



-- 
-- Guozhang

Reply via email to