Hi John, It looks like this is an issue. Guozhang has provided a fix for it here: https://github.com/apache/kafka/pull/4085 You can try it by cloning his repository and build the streams jar.
Thanks, Damian On Tue, 17 Oct 2017 at 23:07 Johan Genberg <johan.genb...@gmail.com> wrote: > Yes, it's noticeably faster in my test environment, under load. I'm going > to apply the default value (100) and see how it behaves in production. > > Thanks, > Johan > > On Tue, Oct 17, 2017 at 1:58 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > This is only for my diagnosing your issue. > > > > I suspect that due to your very small traffic (5 records per sec) and > long > > polling parameter, the restoration process was blocked on waiting for > data. > > More specifically, in KAFKA-5152 which is merged in 0.11.0.1, a thread > may > > be restoring for some tasks while processing for other restored ready > tasks > > at the same time. If your ready tasks data traffic is small, it may have > > side effects on your existing restoring tasks. > > > > > > Guozhang > > > > On Tue, Oct 17, 2017 at 1:44 PM, Johan Genberg <johan.genb...@gmail.com> > > wrote: > > > > > Ok, I will try that. Is there any documentation as to what exactly this > > > does, in general? Just want to safe guard against side effects. Also, > do > > > you recommend 0 as a production setting (default is 100ms, I think), or > > is > > > this just to test this out/diagnose? > > > > > > The only description I can find is this: > > > > > > *"The amount of time in milliseconds to block waiting for input."* > > > > > > Thanks > > > Johan > > > > > > On Tue, Oct 17, 2017 at 1:32 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Johan, > > > > > > > > Could you please try change the following configs and try again? > > > > > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 0); > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Oct 17, 2017 at 12:34 PM, Johan Genberg < > > johan.genb...@gmail.com > > > > > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > The trace log is here (tried to attach to email but it was > rejected): > > > > > > > > > > https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54 > > > > > > > > > > The logs are from a test environment, but I was able to somewhat > > > > reproduce > > > > > the issue, and with 5 or so messages per second on average. The > > > > production > > > > > system where the lag was greater has a similar load, but more > > > > pre-existing > > > > > entries in the key-value stores. > > > > > > > > > > I see a lot of "Skipping fetch for partition > > my_application_enriched-3 > > > > > because there is an in-flight request...", not sure if this is > > > expected. > > > > > > > > > > These are the consumer config overrides: > > > > > > > > > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); > > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 1000); > > > > > properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > > > > > CustomRocksDBConfig.class); > > > > > properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "earliest"); > > > > > > > > > > > > > > > Where the rocksdb config is a fix for 1-core machines: > > > > > > > > > > public static class CustomRocksDBConfig implements > > RocksDBConfigSetter > > > { > > > > > > > > > > @Override > > > > > public void setConfig( > > > > > final String storeName, final Options options, final > > Map<String, > > > > > Object> configs) { > > > > > // Workaround: We must ensure that the parallelism is set to >= > > 2. > > > > > int compactionParallelism = > > > > > Math.max(Runtime.getRuntime().availableProcessors(), 2); > > > > > // Set number of compaction threads (but not flush threads). > > > > > options.setIncreaseParallelism(compactionParallelism); > > > > > } > > > > > } > > > > > > > > > > Again, thanks for your help, > > > > > Johan > > > > > > > > > > On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang <wangg...@gmail.com > > > > > > wrote: > > > > > > > > > > > Hello Johan, > > > > > > > > > > > > Could you list the following information in addition to your > > > topology? > > > > > > > > > > > > 1. Your config settings if you have any overwrites, especially > > > > > consumer's " > > > > > > poll.ms" and "max.poll.records", and "num.threads". > > > > > > > > > > > > 2. Your expected incoming data rate (messages per sec) at normal > > > > > processing > > > > > > phase. > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg < > > > > johan.genb...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > Thank you for responding so quickly. This is the topology. I've > > > > > > simplified > > > > > > > it a bit, but these are the steps it goes through, not sure if > > that > > > > is > > > > > > > helpful. I'll try to get some logs too in a bit. > > > > > > > > > > > > > > *KStream<Integer, Event> eventStream = builder.stream(* > > > > > > > * topic.keySerde(),* > > > > > > > * topic.valueSerde(),* > > > > > > > * topic.name <http://topic.name>());* > > > > > > > > > > > > > > *KStream<Integer, Event> enriched =* > > > > > > > * eventStream* > > > > > > > * .map(...)* > > > > > > > * .transformValues(MyStatefulProcessor::new, "store1")* > > > > > > > * .mapValues(new MyStatelessProcessor());* > > > > > > > * .through("topic")* > > > > > > > * .map(new MyStatelessProcessor2());* > > > > > > > * .through("topic2")* > > > > > > > * .transformValues(MyStatefulProcessor2::new, "store2")* > > > > > > > * .through("topic3")* > > > > > > > * .map(new MyStatelessProcessor3());* > > > > > > > * .through("topic4");* > > > > > > > > > > > > > > Store 1: > > > > > > > > > > > > > > *Stores.create("store1")* > > > > > > > * .withKeys(Serdes.String())* > > > > > > > * .withValues(Serdes.Long())* > > > > > > > * .inMemory()* > > > > > > > * .build();* > > > > > > > > > > > > > > Store 2: > > > > > > > > > > > > > > *Stores.create("store2")* > > > > > > > * .withKeys(Serdes.String())* > > > > > > > * .withValues(valueSerde) // a small json object, 4-5 > > > properties* > > > > > > > * .persistent()* > > > > > > > * .build();* > > > > > > > > > > > > > > Thanks, > > > > > > > Johan > > > > > > > > > > > > > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy < > damian....@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Johan, > > > > > > > > > > > > > > > > Do you have any logs? The state store restoration changed > > > > > significantly > > > > > > > in > > > > > > > > 0.11.0.1. If you could get some logs at trace level, that > would > > > be > > > > > > > useful. > > > > > > > > Also if you could provide your topology (removing anything > > > > > > > > proprietary/sensitive). > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Damian > > > > > > > > > > > > > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg < > > > > johan.genb...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > I'm upgrading a kafka streams application from 0.10.2.1 to > > > > > 0.11.0.1, > > > > > > > > > running against a kafka cluster with version 0.10.2.1. The > > > > > > application > > > > > > > > uses > > > > > > > > > a couple of state stores. > > > > > > > > > > > > > > > > > > When stopping/starting the application prior to the upgrade > > > (with > > > > > > > > 0.10.2.1 > > > > > > > > > client) on 2 instances, it was up and running in less than > > 30s > > > > to a > > > > > > > > minute > > > > > > > > > on all nodes. However, after client was upgraded to > 0.11.0.1, > > > > when > > > > > > the > > > > > > > > > instances started (during some load), it took about 6 > minutes > > > for > > > > > one > > > > > > > of > > > > > > > > > the nodes to reach "RUNNING" state, and the second one > didn't > > > get > > > > > > > there. > > > > > > > > > After 10 minutes I had to roll back. > > > > > > > > > > > > > > > > > > Is it expected that this initial rebalancing takes a little > > > > longer > > > > > > with > > > > > > > > > 0.11.0.1, and is there a way to configure or tweak the > > client, > > > or > > > > > > > > otherwise > > > > > > > > > optimize this to make this go faster? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Johan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > >