Thanks. I've changed the config settings and gotten a little better
results, but the issue still remains, and doesn't seem only related to this.

There seems to be an excessive use of bandwidth when initializing state
stores in 0.11.0.1 compared to previous version. I realized that at least
one of the bottlenecks that made this so slow was that the instances hit
bandwidth limits on inbound network traffic. Not sure if this is useful
information.

Best,
Johan

On Wed, Oct 18, 2017 at 1:53 AM Damian Guy <damian....@gmail.com> wrote:

> Hi John,
>
> It looks like this is an issue. Guozhang has provided a fix for it here:
> https://github.com/apache/kafka/pull/4085
> You can try it by cloning his repository and build the streams jar.
>
> Thanks,
> Damian
>
> On Tue, 17 Oct 2017 at 23:07 Johan Genberg <johan.genb...@gmail.com>
> wrote:
>
> > Yes, it's noticeably faster in my test environment, under load. I'm going
> > to apply the default value (100) and see how it behaves in production.
> >
> > Thanks,
> > Johan
> >
> > On Tue, Oct 17, 2017 at 1:58 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > This is only for my diagnosing your issue.
> > >
> > > I suspect that due to your very small traffic (5 records per sec) and
> > long
> > > polling parameter, the restoration process was blocked on waiting for
> > data.
> > > More specifically, in KAFKA-5152 which is merged in 0.11.0.1, a thread
> > may
> > > be restoring for some tasks while processing for other restored ready
> > tasks
> > > at the same time. If your ready tasks data traffic is small, it may
> have
> > > side effects on your existing restoring tasks.
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Oct 17, 2017 at 1:44 PM, Johan Genberg <
> johan.genb...@gmail.com>
> > > wrote:
> > >
> > > > Ok, I will try that. Is there any documentation as to what exactly
> this
> > > > does, in general? Just want to safe guard against side effects. Also,
> > do
> > > > you recommend 0 as a production setting (default is 100ms, I think),
> or
> > > is
> > > > this just to test this out/diagnose?
> > > >
> > > > The only description I can find is this:
> > > >
> > > > *"The amount of time in milliseconds to block waiting for input."*
> > > >
> > > > Thanks
> > > > Johan
> > > >
> > > > On Tue, Oct 17, 2017 at 1:32 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Johan,
> > > > >
> > > > > Could you please try change the following configs and try again?
> > > > >
> > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 0);
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Oct 17, 2017 at 12:34 PM, Johan Genberg <
> > > johan.genb...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > The trace log is here (tried to attach to email but it was
> > rejected):
> > > > > >
> > > > > >
> https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54
> > > > > >
> > > > > > The logs are from a test environment, but I was able to somewhat
> > > > > reproduce
> > > > > > the issue, and with 5 or so messages per second on average. The
> > > > > production
> > > > > > system where the lag was greater has a similar load, but more
> > > > > pre-existing
> > > > > > entries in the key-value stores.
> > > > > >
> > > > > > I see a lot of "Skipping fetch for partition
> > > my_application_enriched-3
> > > > > > because there is an in-flight request...", not sure if this is
> > > > expected.
> > > > > >
> > > > > > These are the consumer config overrides:
> > > > > >
> > > > > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
> > > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 1000);
> > > > > > properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> > > > > > CustomRocksDBConfig.class);
> > > > > > properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > "earliest");
> > > > > >
> > > > > >
> > > > > > Where the rocksdb config is a fix for 1-core machines:
> > > > > >
> > > > > > public static class CustomRocksDBConfig implements
> > > RocksDBConfigSetter
> > > > {
> > > > > >
> > > > > >   @Override
> > > > > >   public void setConfig(
> > > > > >       final String storeName, final Options options, final
> > > Map<String,
> > > > > > Object> configs) {
> > > > > >     // Workaround: We must ensure that the parallelism is set to
> >=
> > > 2.
> > > > > >     int compactionParallelism =
> > > > > > Math.max(Runtime.getRuntime().availableProcessors(), 2);
> > > > > >     // Set number of compaction threads (but not flush threads).
> > > > > >     options.setIncreaseParallelism(compactionParallelism);
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > > Again, thanks for your help,
> > > > > > Johan
> > > > > >
> > > > > > On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hello Johan,
> > > > > > >
> > > > > > > Could you list the following information in addition to your
> > > > topology?
> > > > > > >
> > > > > > > 1. Your config settings if you have any overwrites, especially
> > > > > > consumer's "
> > > > > > > poll.ms" and "max.poll.records", and "num.threads".
> > > > > > >
> > > > > > > 2. Your expected incoming data rate (messages per sec) at
> normal
> > > > > > processing
> > > > > > > phase.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg <
> > > > > johan.genb...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Thank you for responding so quickly. This is the topology.
> I've
> > > > > > > simplified
> > > > > > > > it a bit, but these are the steps it goes through, not sure
> if
> > > that
> > > > > is
> > > > > > > > helpful. I'll try to get some logs too in a bit.
> > > > > > > >
> > > > > > > > *KStream<Integer, Event> eventStream = builder.stream(*
> > > > > > > > *  topic.keySerde(),*
> > > > > > > > *  topic.valueSerde(),*
> > > > > > > > *  topic.name <http://topic.name>());*
> > > > > > > >
> > > > > > > > *KStream<Integer, Event> enriched =*
> > > > > > > > *    eventStream*
> > > > > > > > *        .map(...)*
> > > > > > > > *        .transformValues(MyStatefulProcessor::new,
> "store1")*
> > > > > > > > *        .mapValues(new MyStatelessProcessor());*
> > > > > > > > *        .through("topic")*
> > > > > > > > *        .map(new MyStatelessProcessor2());*
> > > > > > > > *        .through("topic2")*
> > > > > > > > *        .transformValues(MyStatefulProcessor2::new,
> "store2")*
> > > > > > > > *        .through("topic3")*
> > > > > > > > *        .map(new MyStatelessProcessor3());*
> > > > > > > > *        .through("topic4");*
> > > > > > > >
> > > > > > > > Store 1:
> > > > > > > >
> > > > > > > > *Stores.create("store1")*
> > > > > > > > *    .withKeys(Serdes.String())*
> > > > > > > > *    .withValues(Serdes.Long())*
> > > > > > > > *    .inMemory()*
> > > > > > > > *    .build();*
> > > > > > > >
> > > > > > > > Store 2:
> > > > > > > >
> > > > > > > > *Stores.create("store2")*
> > > > > > > > *    .withKeys(Serdes.String())*
> > > > > > > > *    .withValues(valueSerde) // a small json object, 4-5
> > > > properties*
> > > > > > > > *    .persistent()*
> > > > > > > > *    .build();*
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Johan
> > > > > > > >
> > > > > > > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <
> > damian....@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Johan,
> > > > > > > > >
> > > > > > > > > Do you have any logs? The state store restoration changed
> > > > > > significantly
> > > > > > > > in
> > > > > > > > > 0.11.0.1. If you could get some logs at trace level, that
> > would
> > > > be
> > > > > > > > useful.
> > > > > > > > > Also if you could provide your topology (removing anything
> > > > > > > > > proprietary/sensitive).
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Damian
> > > > > > > > >
> > > > > > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg <
> > > > > johan.genb...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > I'm upgrading a kafka streams application from 0.10.2.1
> to
> > > > > > 0.11.0.1,
> > > > > > > > > > running against a kafka cluster with version 0.10.2.1.
> The
> > > > > > > application
> > > > > > > > > uses
> > > > > > > > > > a couple of state stores.
> > > > > > > > > >
> > > > > > > > > > When stopping/starting the application prior to the
> upgrade
> > > > (with
> > > > > > > > > 0.10.2.1
> > > > > > > > > > client) on 2 instances, it was up and running in less
> than
> > > 30s
> > > > > to a
> > > > > > > > > minute
> > > > > > > > > > on all nodes. However, after client was upgraded to
> > 0.11.0.1,
> > > > > when
> > > > > > > the
> > > > > > > > > > instances started (during some load), it took about 6
> > minutes
> > > > for
> > > > > > one
> > > > > > > > of
> > > > > > > > > > the nodes to reach "RUNNING" state, and the second one
> > didn't
> > > > get
> > > > > > > > there.
> > > > > > > > > > After 10 minutes I had to roll back.
> > > > > > > > > >
> > > > > > > > > > Is it expected that this initial rebalancing takes a
> little
> > > > > longer
> > > > > > > with
> > > > > > > > > > 0.11.0.1, and is there a way to configure or tweak the
> > > client,
> > > > or
> > > > > > > > > otherwise
> > > > > > > > > > optimize this to make this go faster?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Johan
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to