This is only for my diagnosing your issue. I suspect that due to your very small traffic (5 records per sec) and long polling parameter, the restoration process was blocked on waiting for data. More specifically, in KAFKA-5152 which is merged in 0.11.0.1, a thread may be restoring for some tasks while processing for other restored ready tasks at the same time. If your ready tasks data traffic is small, it may have side effects on your existing restoring tasks.
Guozhang On Tue, Oct 17, 2017 at 1:44 PM, Johan Genberg <johan.genb...@gmail.com> wrote: > Ok, I will try that. Is there any documentation as to what exactly this > does, in general? Just want to safe guard against side effects. Also, do > you recommend 0 as a production setting (default is 100ms, I think), or is > this just to test this out/diagnose? > > The only description I can find is this: > > *"The amount of time in milliseconds to block waiting for input."* > > Thanks > Johan > > On Tue, Oct 17, 2017 at 1:32 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Johan, > > > > Could you please try change the following configs and try again? > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 0); > > > > > > Guozhang > > > > > > On Tue, Oct 17, 2017 at 12:34 PM, Johan Genberg <johan.genb...@gmail.com > > > > wrote: > > > > > Hi, > > > > > > The trace log is here (tried to attach to email but it was rejected): > > > > > > https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54 > > > > > > The logs are from a test environment, but I was able to somewhat > > reproduce > > > the issue, and with 5 or so messages per second on average. The > > production > > > system where the lag was greater has a similar load, but more > > pre-existing > > > entries in the key-value stores. > > > > > > I see a lot of "Skipping fetch for partition my_application_enriched-3 > > > because there is an in-flight request...", not sure if this is > expected. > > > > > > These are the consumer config overrides: > > > > > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 1000); > > > properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > > > CustomRocksDBConfig.class); > > > properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > > > > > > > > Where the rocksdb config is a fix for 1-core machines: > > > > > > public static class CustomRocksDBConfig implements RocksDBConfigSetter > { > > > > > > @Override > > > public void setConfig( > > > final String storeName, final Options options, final Map<String, > > > Object> configs) { > > > // Workaround: We must ensure that the parallelism is set to >= 2. > > > int compactionParallelism = > > > Math.max(Runtime.getRuntime().availableProcessors(), 2); > > > // Set number of compaction threads (but not flush threads). > > > options.setIncreaseParallelism(compactionParallelism); > > > } > > > } > > > > > > Again, thanks for your help, > > > Johan > > > > > > On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hello Johan, > > > > > > > > Could you list the following information in addition to your > topology? > > > > > > > > 1. Your config settings if you have any overwrites, especially > > > consumer's " > > > > poll.ms" and "max.poll.records", and "num.threads". > > > > > > > > 2. Your expected incoming data rate (messages per sec) at normal > > > processing > > > > phase. > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg < > > johan.genb...@gmail.com> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > Thank you for responding so quickly. This is the topology. I've > > > > simplified > > > > > it a bit, but these are the steps it goes through, not sure if that > > is > > > > > helpful. I'll try to get some logs too in a bit. > > > > > > > > > > *KStream<Integer, Event> eventStream = builder.stream(* > > > > > * topic.keySerde(),* > > > > > * topic.valueSerde(),* > > > > > * topic.name <http://topic.name>());* > > > > > > > > > > *KStream<Integer, Event> enriched =* > > > > > * eventStream* > > > > > * .map(...)* > > > > > * .transformValues(MyStatefulProcessor::new, "store1")* > > > > > * .mapValues(new MyStatelessProcessor());* > > > > > * .through("topic")* > > > > > * .map(new MyStatelessProcessor2());* > > > > > * .through("topic2")* > > > > > * .transformValues(MyStatefulProcessor2::new, "store2")* > > > > > * .through("topic3")* > > > > > * .map(new MyStatelessProcessor3());* > > > > > * .through("topic4");* > > > > > > > > > > Store 1: > > > > > > > > > > *Stores.create("store1")* > > > > > * .withKeys(Serdes.String())* > > > > > * .withValues(Serdes.Long())* > > > > > * .inMemory()* > > > > > * .build();* > > > > > > > > > > Store 2: > > > > > > > > > > *Stores.create("store2")* > > > > > * .withKeys(Serdes.String())* > > > > > * .withValues(valueSerde) // a small json object, 4-5 > properties* > > > > > * .persistent()* > > > > > * .build();* > > > > > > > > > > Thanks, > > > > > Johan > > > > > > > > > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > > > > > Hi Johan, > > > > > > > > > > > > Do you have any logs? The state store restoration changed > > > significantly > > > > > in > > > > > > 0.11.0.1. If you could get some logs at trace level, that would > be > > > > > useful. > > > > > > Also if you could provide your topology (removing anything > > > > > > proprietary/sensitive). > > > > > > > > > > > > Thanks, > > > > > > Damian > > > > > > > > > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg < > > johan.genb...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > I'm upgrading a kafka streams application from 0.10.2.1 to > > > 0.11.0.1, > > > > > > > running against a kafka cluster with version 0.10.2.1. The > > > > application > > > > > > uses > > > > > > > a couple of state stores. > > > > > > > > > > > > > > When stopping/starting the application prior to the upgrade > (with > > > > > > 0.10.2.1 > > > > > > > client) on 2 instances, it was up and running in less than 30s > > to a > > > > > > minute > > > > > > > on all nodes. However, after client was upgraded to 0.11.0.1, > > when > > > > the > > > > > > > instances started (during some load), it took about 6 minutes > for > > > one > > > > > of > > > > > > > the nodes to reach "RUNNING" state, and the second one didn't > get > > > > > there. > > > > > > > After 10 minutes I had to roll back. > > > > > > > > > > > > > > Is it expected that this initial rebalancing takes a little > > longer > > > > with > > > > > > > 0.11.0.1, and is there a way to configure or tweak the client, > or > > > > > > otherwise > > > > > > > optimize this to make this go faster? > > > > > > > > > > > > > > Thanks, > > > > > > > Johan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang