Hello Johan,

Thanks for the information. Could you apply the patch as in
https://github.com/apache/kafka/pull/4085 from a source release of Streams
and see if it fixed your problem?


Guozhang


On Fri, Oct 20, 2017 at 12:25 PM, Johan Genberg <johan.genb...@gmail.com>
wrote:

> Thanks. I've changed the config settings and gotten a little better
> results, but the issue still remains, and doesn't seem only related to
> this.
>
> There seems to be an excessive use of bandwidth when initializing state
> stores in 0.11.0.1 compared to previous version. I realized that at least
> one of the bottlenecks that made this so slow was that the instances hit
> bandwidth limits on inbound network traffic. Not sure if this is useful
> information.
>
> Best,
> Johan
>
> On Wed, Oct 18, 2017 at 1:53 AM Damian Guy <damian....@gmail.com> wrote:
>
> > Hi John,
> >
> > It looks like this is an issue. Guozhang has provided a fix for it here:
> > https://github.com/apache/kafka/pull/4085
> > You can try it by cloning his repository and build the streams jar.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 17 Oct 2017 at 23:07 Johan Genberg <johan.genb...@gmail.com>
> > wrote:
> >
> > > Yes, it's noticeably faster in my test environment, under load. I'm
> going
> > > to apply the default value (100) and see how it behaves in production.
> > >
> > > Thanks,
> > > Johan
> > >
> > > On Tue, Oct 17, 2017 at 1:58 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > This is only for my diagnosing your issue.
> > > >
> > > > I suspect that due to your very small traffic (5 records per sec) and
> > > long
> > > > polling parameter, the restoration process was blocked on waiting for
> > > data.
> > > > More specifically, in KAFKA-5152 which is merged in 0.11.0.1, a
> thread
> > > may
> > > > be restoring for some tasks while processing for other restored ready
> > > tasks
> > > > at the same time. If your ready tasks data traffic is small, it may
> > have
> > > > side effects on your existing restoring tasks.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Oct 17, 2017 at 1:44 PM, Johan Genberg <
> > johan.genb...@gmail.com>
> > > > wrote:
> > > >
> > > > > Ok, I will try that. Is there any documentation as to what exactly
> > this
> > > > > does, in general? Just want to safe guard against side effects.
> Also,
> > > do
> > > > > you recommend 0 as a production setting (default is 100ms, I
> think),
> > or
> > > > is
> > > > > this just to test this out/diagnose?
> > > > >
> > > > > The only description I can find is this:
> > > > >
> > > > > *"The amount of time in milliseconds to block waiting for input."*
> > > > >
> > > > > Thanks
> > > > > Johan
> > > > >
> > > > > On Tue, Oct 17, 2017 at 1:32 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Johan,
> > > > > >
> > > > > > Could you please try change the following configs and try again?
> > > > > >
> > > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 0);
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Tue, Oct 17, 2017 at 12:34 PM, Johan Genberg <
> > > > johan.genb...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > The trace log is here (tried to attach to email but it was
> > > rejected):
> > > > > > >
> > > > > > >
> > https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54
> > > > > > >
> > > > > > > The logs are from a test environment, but I was able to
> somewhat
> > > > > > reproduce
> > > > > > > the issue, and with 5 or so messages per second on average. The
> > > > > > production
> > > > > > > system where the lag was greater has a similar load, but more
> > > > > > pre-existing
> > > > > > > entries in the key-value stores.
> > > > > > >
> > > > > > > I see a lot of "Skipping fetch for partition
> > > > my_application_enriched-3
> > > > > > > because there is an in-flight request...", not sure if this is
> > > > > expected.
> > > > > > >
> > > > > > > These are the consumer config overrides:
> > > > > > >
> > > > > > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
> > > > > > > properties.put(StreamsConfig.POLL_MS_CONFIG, 1000);
> > > > > > > properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_
> CONFIG,
> > > > > > > CustomRocksDBConfig.class);
> > > > > > > properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > "earliest");
> > > > > > >
> > > > > > >
> > > > > > > Where the rocksdb config is a fix for 1-core machines:
> > > > > > >
> > > > > > > public static class CustomRocksDBConfig implements
> > > > RocksDBConfigSetter
> > > > > {
> > > > > > >
> > > > > > >   @Override
> > > > > > >   public void setConfig(
> > > > > > >       final String storeName, final Options options, final
> > > > Map<String,
> > > > > > > Object> configs) {
> > > > > > >     // Workaround: We must ensure that the parallelism is set
> to
> > >=
> > > > 2.
> > > > > > >     int compactionParallelism =
> > > > > > > Math.max(Runtime.getRuntime().availableProcessors(), 2);
> > > > > > >     // Set number of compaction threads (but not flush
> threads).
> > > > > > >     options.setIncreaseParallelism(compactionParallelism);
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > > Again, thanks for your help,
> > > > > > > Johan
> > > > > > >
> > > > > > > On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Johan,
> > > > > > > >
> > > > > > > > Could you list the following information in addition to your
> > > > > topology?
> > > > > > > >
> > > > > > > > 1. Your config settings if you have any overwrites,
> especially
> > > > > > > consumer's "
> > > > > > > > poll.ms" and "max.poll.records", and "num.threads".
> > > > > > > >
> > > > > > > > 2. Your expected incoming data rate (messages per sec) at
> > normal
> > > > > > > processing
> > > > > > > > phase.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg <
> > > > > > johan.genb...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > Thank you for responding so quickly. This is the topology.
> > I've
> > > > > > > > simplified
> > > > > > > > > it a bit, but these are the steps it goes through, not sure
> > if
> > > > that
> > > > > > is
> > > > > > > > > helpful. I'll try to get some logs too in a bit.
> > > > > > > > >
> > > > > > > > > *KStream<Integer, Event> eventStream = builder.stream(*
> > > > > > > > > *  topic.keySerde(),*
> > > > > > > > > *  topic.valueSerde(),*
> > > > > > > > > *  topic.name <http://topic.name>());*
> > > > > > > > >
> > > > > > > > > *KStream<Integer, Event> enriched =*
> > > > > > > > > *    eventStream*
> > > > > > > > > *        .map(...)*
> > > > > > > > > *        .transformValues(MyStatefulProcessor::new,
> > "store1")*
> > > > > > > > > *        .mapValues(new MyStatelessProcessor());*
> > > > > > > > > *        .through("topic")*
> > > > > > > > > *        .map(new MyStatelessProcessor2());*
> > > > > > > > > *        .through("topic2")*
> > > > > > > > > *        .transformValues(MyStatefulProcessor2::new,
> > "store2")*
> > > > > > > > > *        .through("topic3")*
> > > > > > > > > *        .map(new MyStatelessProcessor3());*
> > > > > > > > > *        .through("topic4");*
> > > > > > > > >
> > > > > > > > > Store 1:
> > > > > > > > >
> > > > > > > > > *Stores.create("store1")*
> > > > > > > > > *    .withKeys(Serdes.String())*
> > > > > > > > > *    .withValues(Serdes.Long())*
> > > > > > > > > *    .inMemory()*
> > > > > > > > > *    .build();*
> > > > > > > > >
> > > > > > > > > Store 2:
> > > > > > > > >
> > > > > > > > > *Stores.create("store2")*
> > > > > > > > > *    .withKeys(Serdes.String())*
> > > > > > > > > *    .withValues(valueSerde) // a small json object, 4-5
> > > > > properties*
> > > > > > > > > *    .persistent()*
> > > > > > > > > *    .build();*
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Johan
> > > > > > > > >
> > > > > > > > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <
> > > damian....@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Johan,
> > > > > > > > > >
> > > > > > > > > > Do you have any logs? The state store restoration changed
> > > > > > > significantly
> > > > > > > > > in
> > > > > > > > > > 0.11.0.1. If you could get some logs at trace level, that
> > > would
> > > > > be
> > > > > > > > > useful.
> > > > > > > > > > Also if you could provide your topology (removing
> anything
> > > > > > > > > > proprietary/sensitive).
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Damian
> > > > > > > > > >
> > > > > > > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg <
> > > > > > johan.genb...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > I'm upgrading a kafka streams application from 0.10.2.1
> > to
> > > > > > > 0.11.0.1,
> > > > > > > > > > > running against a kafka cluster with version 0.10.2.1.
> > The
> > > > > > > > application
> > > > > > > > > > uses
> > > > > > > > > > > a couple of state stores.
> > > > > > > > > > >
> > > > > > > > > > > When stopping/starting the application prior to the
> > upgrade
> > > > > (with
> > > > > > > > > > 0.10.2.1
> > > > > > > > > > > client) on 2 instances, it was up and running in less
> > than
> > > > 30s
> > > > > > to a
> > > > > > > > > > minute
> > > > > > > > > > > on all nodes. However, after client was upgraded to
> > > 0.11.0.1,
> > > > > > when
> > > > > > > > the
> > > > > > > > > > > instances started (during some load), it took about 6
> > > minutes
> > > > > for
> > > > > > > one
> > > > > > > > > of
> > > > > > > > > > > the nodes to reach "RUNNING" state, and the second one
> > > didn't
> > > > > get
> > > > > > > > > there.
> > > > > > > > > > > After 10 minutes I had to roll back.
> > > > > > > > > > >
> > > > > > > > > > > Is it expected that this initial rebalancing takes a
> > little
> > > > > > longer
> > > > > > > > with
> > > > > > > > > > > 0.11.0.1, and is there a way to configure or tweak the
> > > > client,
> > > > > or
> > > > > > > > > > otherwise
> > > > > > > > > > > optimize this to make this go faster?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Johan
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to