Hello Johan, Thanks for the information. Could you apply the patch as in https://github.com/apache/kafka/pull/4085 from a source release of Streams and see if it fixed your problem?
Guozhang On Fri, Oct 20, 2017 at 12:25 PM, Johan Genberg <johan.genb...@gmail.com> wrote: > Thanks. I've changed the config settings and gotten a little better > results, but the issue still remains, and doesn't seem only related to > this. > > There seems to be an excessive use of bandwidth when initializing state > stores in 0.11.0.1 compared to previous version. I realized that at least > one of the bottlenecks that made this so slow was that the instances hit > bandwidth limits on inbound network traffic. Not sure if this is useful > information. > > Best, > Johan > > On Wed, Oct 18, 2017 at 1:53 AM Damian Guy <damian....@gmail.com> wrote: > > > Hi John, > > > > It looks like this is an issue. Guozhang has provided a fix for it here: > > https://github.com/apache/kafka/pull/4085 > > You can try it by cloning his repository and build the streams jar. > > > > Thanks, > > Damian > > > > On Tue, 17 Oct 2017 at 23:07 Johan Genberg <johan.genb...@gmail.com> > > wrote: > > > > > Yes, it's noticeably faster in my test environment, under load. I'm > going > > > to apply the default value (100) and see how it behaves in production. > > > > > > Thanks, > > > Johan > > > > > > On Tue, Oct 17, 2017 at 1:58 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > This is only for my diagnosing your issue. > > > > > > > > I suspect that due to your very small traffic (5 records per sec) and > > > long > > > > polling parameter, the restoration process was blocked on waiting for > > > data. > > > > More specifically, in KAFKA-5152 which is merged in 0.11.0.1, a > thread > > > may > > > > be restoring for some tasks while processing for other restored ready > > > tasks > > > > at the same time. If your ready tasks data traffic is small, it may > > have > > > > side effects on your existing restoring tasks. > > > > > > > > > > > > Guozhang > > > > > > > > On Tue, Oct 17, 2017 at 1:44 PM, Johan Genberg < > > johan.genb...@gmail.com> > > > > wrote: > > > > > > > > > Ok, I will try that. Is there any documentation as to what exactly > > this > > > > > does, in general? Just want to safe guard against side effects. > Also, > > > do > > > > > you recommend 0 as a production setting (default is 100ms, I > think), > > or > > > > is > > > > > this just to test this out/diagnose? > > > > > > > > > > The only description I can find is this: > > > > > > > > > > *"The amount of time in milliseconds to block waiting for input."* > > > > > > > > > > Thanks > > > > > Johan > > > > > > > > > > On Tue, Oct 17, 2017 at 1:32 PM Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > Johan, > > > > > > > > > > > > Could you please try change the following configs and try again? > > > > > > > > > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 0); > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Tue, Oct 17, 2017 at 12:34 PM, Johan Genberg < > > > > johan.genb...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > The trace log is here (tried to attach to email but it was > > > rejected): > > > > > > > > > > > > > > > > https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54 > > > > > > > > > > > > > > The logs are from a test environment, but I was able to > somewhat > > > > > > reproduce > > > > > > > the issue, and with 5 or so messages per second on average. The > > > > > > production > > > > > > > system where the lag was greater has a similar load, but more > > > > > > pre-existing > > > > > > > entries in the key-value stores. > > > > > > > > > > > > > > I see a lot of "Skipping fetch for partition > > > > my_application_enriched-3 > > > > > > > because there is an in-flight request...", not sure if this is > > > > > expected. > > > > > > > > > > > > > > These are the consumer config overrides: > > > > > > > > > > > > > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); > > > > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 1000); > > > > > > > properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_ > CONFIG, > > > > > > > CustomRocksDBConfig.class); > > > > > > > properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > > "earliest"); > > > > > > > > > > > > > > > > > > > > > Where the rocksdb config is a fix for 1-core machines: > > > > > > > > > > > > > > public static class CustomRocksDBConfig implements > > > > RocksDBConfigSetter > > > > > { > > > > > > > > > > > > > > @Override > > > > > > > public void setConfig( > > > > > > > final String storeName, final Options options, final > > > > Map<String, > > > > > > > Object> configs) { > > > > > > > // Workaround: We must ensure that the parallelism is set > to > > >= > > > > 2. > > > > > > > int compactionParallelism = > > > > > > > Math.max(Runtime.getRuntime().availableProcessors(), 2); > > > > > > > // Set number of compaction threads (but not flush > threads). > > > > > > > options.setIncreaseParallelism(compactionParallelism); > > > > > > > } > > > > > > > } > > > > > > > > > > > > > > Again, thanks for your help, > > > > > > > Johan > > > > > > > > > > > > > > On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang < > > wangg...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hello Johan, > > > > > > > > > > > > > > > > Could you list the following information in addition to your > > > > > topology? > > > > > > > > > > > > > > > > 1. Your config settings if you have any overwrites, > especially > > > > > > > consumer's " > > > > > > > > poll.ms" and "max.poll.records", and "num.threads". > > > > > > > > > > > > > > > > 2. Your expected incoming data rate (messages per sec) at > > normal > > > > > > > processing > > > > > > > > phase. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg < > > > > > > johan.genb...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > Thank you for responding so quickly. This is the topology. > > I've > > > > > > > > simplified > > > > > > > > > it a bit, but these are the steps it goes through, not sure > > if > > > > that > > > > > > is > > > > > > > > > helpful. I'll try to get some logs too in a bit. > > > > > > > > > > > > > > > > > > *KStream<Integer, Event> eventStream = builder.stream(* > > > > > > > > > * topic.keySerde(),* > > > > > > > > > * topic.valueSerde(),* > > > > > > > > > * topic.name <http://topic.name>());* > > > > > > > > > > > > > > > > > > *KStream<Integer, Event> enriched =* > > > > > > > > > * eventStream* > > > > > > > > > * .map(...)* > > > > > > > > > * .transformValues(MyStatefulProcessor::new, > > "store1")* > > > > > > > > > * .mapValues(new MyStatelessProcessor());* > > > > > > > > > * .through("topic")* > > > > > > > > > * .map(new MyStatelessProcessor2());* > > > > > > > > > * .through("topic2")* > > > > > > > > > * .transformValues(MyStatefulProcessor2::new, > > "store2")* > > > > > > > > > * .through("topic3")* > > > > > > > > > * .map(new MyStatelessProcessor3());* > > > > > > > > > * .through("topic4");* > > > > > > > > > > > > > > > > > > Store 1: > > > > > > > > > > > > > > > > > > *Stores.create("store1")* > > > > > > > > > * .withKeys(Serdes.String())* > > > > > > > > > * .withValues(Serdes.Long())* > > > > > > > > > * .inMemory()* > > > > > > > > > * .build();* > > > > > > > > > > > > > > > > > > Store 2: > > > > > > > > > > > > > > > > > > *Stores.create("store2")* > > > > > > > > > * .withKeys(Serdes.String())* > > > > > > > > > * .withValues(valueSerde) // a small json object, 4-5 > > > > > properties* > > > > > > > > > * .persistent()* > > > > > > > > > * .build();* > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Johan > > > > > > > > > > > > > > > > > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy < > > > damian....@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Johan, > > > > > > > > > > > > > > > > > > > > Do you have any logs? The state store restoration changed > > > > > > > significantly > > > > > > > > > in > > > > > > > > > > 0.11.0.1. If you could get some logs at trace level, that > > > would > > > > > be > > > > > > > > > useful. > > > > > > > > > > Also if you could provide your topology (removing > anything > > > > > > > > > > proprietary/sensitive). > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Damian > > > > > > > > > > > > > > > > > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg < > > > > > > johan.genb...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > I'm upgrading a kafka streams application from 0.10.2.1 > > to > > > > > > > 0.11.0.1, > > > > > > > > > > > running against a kafka cluster with version 0.10.2.1. > > The > > > > > > > > application > > > > > > > > > > uses > > > > > > > > > > > a couple of state stores. > > > > > > > > > > > > > > > > > > > > > > When stopping/starting the application prior to the > > upgrade > > > > > (with > > > > > > > > > > 0.10.2.1 > > > > > > > > > > > client) on 2 instances, it was up and running in less > > than > > > > 30s > > > > > > to a > > > > > > > > > > minute > > > > > > > > > > > on all nodes. However, after client was upgraded to > > > 0.11.0.1, > > > > > > when > > > > > > > > the > > > > > > > > > > > instances started (during some load), it took about 6 > > > minutes > > > > > for > > > > > > > one > > > > > > > > > of > > > > > > > > > > > the nodes to reach "RUNNING" state, and the second one > > > didn't > > > > > get > > > > > > > > > there. > > > > > > > > > > > After 10 minutes I had to roll back. > > > > > > > > > > > > > > > > > > > > > > Is it expected that this initial rebalancing takes a > > little > > > > > > longer > > > > > > > > with > > > > > > > > > > > 0.11.0.1, and is there a way to configure or tweak the > > > > client, > > > > > or > > > > > > > > > > otherwise > > > > > > > > > > > optimize this to make this go faster? > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Johan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > -- -- Guozhang