Hi, The trace log is here (tried to attach to email but it was rejected):
https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54 The logs are from a test environment, but I was able to somewhat reproduce the issue, and with 5 or so messages per second on average. The production system where the lag was greater has a similar load, but more pre-existing entries in the key-value stores. I see a lot of "Skipping fetch for partition my_application_enriched-3 because there is an in-flight request...", not sure if this is expected. These are the consumer config overrides: properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); properties.put(StreamsConfig.POLL_MS_CONFIG, 1000); properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Where the rocksdb config is a fix for 1-core machines: public static class CustomRocksDBConfig implements RocksDBConfigSetter { @Override public void setConfig( final String storeName, final Options options, final Map<String, Object> configs) { // Workaround: We must ensure that the parallelism is set to >= 2. int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2); // Set number of compaction threads (but not flush threads). options.setIncreaseParallelism(compactionParallelism); } } Again, thanks for your help, Johan On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Johan, > > Could you list the following information in addition to your topology? > > 1. Your config settings if you have any overwrites, especially consumer's " > poll.ms" and "max.poll.records", and "num.threads". > > 2. Your expected incoming data rate (messages per sec) at normal processing > phase. > > > > Guozhang > > > On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg <johan.genb...@gmail.com> > wrote: > > > Hi, > > > > Thank you for responding so quickly. This is the topology. I've > simplified > > it a bit, but these are the steps it goes through, not sure if that is > > helpful. I'll try to get some logs too in a bit. > > > > *KStream<Integer, Event> eventStream = builder.stream(* > > * topic.keySerde(),* > > * topic.valueSerde(),* > > * topic.name <http://topic.name>());* > > > > *KStream<Integer, Event> enriched =* > > * eventStream* > > * .map(...)* > > * .transformValues(MyStatefulProcessor::new, "store1")* > > * .mapValues(new MyStatelessProcessor());* > > * .through("topic")* > > * .map(new MyStatelessProcessor2());* > > * .through("topic2")* > > * .transformValues(MyStatefulProcessor2::new, "store2")* > > * .through("topic3")* > > * .map(new MyStatelessProcessor3());* > > * .through("topic4");* > > > > Store 1: > > > > *Stores.create("store1")* > > * .withKeys(Serdes.String())* > > * .withValues(Serdes.Long())* > > * .inMemory()* > > * .build();* > > > > Store 2: > > > > *Stores.create("store2")* > > * .withKeys(Serdes.String())* > > * .withValues(valueSerde) // a small json object, 4-5 properties* > > * .persistent()* > > * .build();* > > > > Thanks, > > Johan > > > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <damian....@gmail.com> wrote: > > > > > Hi Johan, > > > > > > Do you have any logs? The state store restoration changed significantly > > in > > > 0.11.0.1. If you could get some logs at trace level, that would be > > useful. > > > Also if you could provide your topology (removing anything > > > proprietary/sensitive). > > > > > > Thanks, > > > Damian > > > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg <johan.genb...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > I'm upgrading a kafka streams application from 0.10.2.1 to 0.11.0.1, > > > > running against a kafka cluster with version 0.10.2.1. The > application > > > uses > > > > a couple of state stores. > > > > > > > > When stopping/starting the application prior to the upgrade (with > > > 0.10.2.1 > > > > client) on 2 instances, it was up and running in less than 30s to a > > > minute > > > > on all nodes. However, after client was upgraded to 0.11.0.1, when > the > > > > instances started (during some load), it took about 6 minutes for one > > of > > > > the nodes to reach "RUNNING" state, and the second one didn't get > > there. > > > > After 10 minutes I had to roll back. > > > > > > > > Is it expected that this initial rebalancing takes a little longer > with > > > > 0.11.0.1, and is there a way to configure or tweak the client, or > > > otherwise > > > > optimize this to make this go faster? > > > > > > > > Thanks, > > > > Johan > > > > > > > > > > > > > -- > -- Guozhang >