Johan, Could you please try change the following configs and try again?
properties.put(StreamsConfig.POLL_MS_CONFIG, 0); Guozhang On Tue, Oct 17, 2017 at 12:34 PM, Johan Genberg <johan.genb...@gmail.com> wrote: > Hi, > > The trace log is here (tried to attach to email but it was rejected): > > https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54 > > The logs are from a test environment, but I was able to somewhat reproduce > the issue, and with 5 or so messages per second on average. The production > system where the lag was greater has a similar load, but more pre-existing > entries in the key-value stores. > > I see a lot of "Skipping fetch for partition my_application_enriched-3 > because there is an in-flight request...", not sure if this is expected. > > These are the consumer config overrides: > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); > properties.put(StreamsConfig.POLL_MS_CONFIG, 1000); > properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > CustomRocksDBConfig.class); > properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > > Where the rocksdb config is a fix for 1-core machines: > > public static class CustomRocksDBConfig implements RocksDBConfigSetter { > > @Override > public void setConfig( > final String storeName, final Options options, final Map<String, > Object> configs) { > // Workaround: We must ensure that the parallelism is set to >= 2. > int compactionParallelism = > Math.max(Runtime.getRuntime().availableProcessors(), 2); > // Set number of compaction threads (but not flush threads). > options.setIncreaseParallelism(compactionParallelism); > } > } > > Again, thanks for your help, > Johan > > On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Johan, > > > > Could you list the following information in addition to your topology? > > > > 1. Your config settings if you have any overwrites, especially > consumer's " > > poll.ms" and "max.poll.records", and "num.threads". > > > > 2. Your expected incoming data rate (messages per sec) at normal > processing > > phase. > > > > > > > > Guozhang > > > > > > On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg <johan.genb...@gmail.com> > > wrote: > > > > > Hi, > > > > > > Thank you for responding so quickly. This is the topology. I've > > simplified > > > it a bit, but these are the steps it goes through, not sure if that is > > > helpful. I'll try to get some logs too in a bit. > > > > > > *KStream<Integer, Event> eventStream = builder.stream(* > > > * topic.keySerde(),* > > > * topic.valueSerde(),* > > > * topic.name <http://topic.name>());* > > > > > > *KStream<Integer, Event> enriched =* > > > * eventStream* > > > * .map(...)* > > > * .transformValues(MyStatefulProcessor::new, "store1")* > > > * .mapValues(new MyStatelessProcessor());* > > > * .through("topic")* > > > * .map(new MyStatelessProcessor2());* > > > * .through("topic2")* > > > * .transformValues(MyStatefulProcessor2::new, "store2")* > > > * .through("topic3")* > > > * .map(new MyStatelessProcessor3());* > > > * .through("topic4");* > > > > > > Store 1: > > > > > > *Stores.create("store1")* > > > * .withKeys(Serdes.String())* > > > * .withValues(Serdes.Long())* > > > * .inMemory()* > > > * .build();* > > > > > > Store 2: > > > > > > *Stores.create("store2")* > > > * .withKeys(Serdes.String())* > > > * .withValues(valueSerde) // a small json object, 4-5 properties* > > > * .persistent()* > > > * .build();* > > > > > > Thanks, > > > Johan > > > > > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <damian....@gmail.com> > wrote: > > > > > > > Hi Johan, > > > > > > > > Do you have any logs? The state store restoration changed > significantly > > > in > > > > 0.11.0.1. If you could get some logs at trace level, that would be > > > useful. > > > > Also if you could provide your topology (removing anything > > > > proprietary/sensitive). > > > > > > > > Thanks, > > > > Damian > > > > > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg <johan.genb...@gmail.com> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I'm upgrading a kafka streams application from 0.10.2.1 to > 0.11.0.1, > > > > > running against a kafka cluster with version 0.10.2.1. The > > application > > > > uses > > > > > a couple of state stores. > > > > > > > > > > When stopping/starting the application prior to the upgrade (with > > > > 0.10.2.1 > > > > > client) on 2 instances, it was up and running in less than 30s to a > > > > minute > > > > > on all nodes. However, after client was upgraded to 0.11.0.1, when > > the > > > > > instances started (during some load), it took about 6 minutes for > one > > > of > > > > > the nodes to reach "RUNNING" state, and the second one didn't get > > > there. > > > > > After 10 minutes I had to roll back. > > > > > > > > > > Is it expected that this initial rebalancing takes a little longer > > with > > > > > 0.11.0.1, and is there a way to configure or tweak the client, or > > > > otherwise > > > > > optimize this to make this go faster? > > > > > > > > > > Thanks, > > > > > Johan > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang