Hi,

The trace log is here (tried to attach to email but it was rejected):

https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54

The logs are from a test environment, but I was able to somewhat reproduce
the issue, and with 5 or so messages per second on average. The production
system where the lag was greater has a similar load, but more pre-existing
entries in the key-value stores.

I see a lot of "Skipping fetch for partition my_application_enriched-3
because there is an in-flight request...", not sure if this is expected.

These are the consumer config overrides:

properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
properties.put(StreamsConfig.POLL_MS_CONFIG, 1000);
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
CustomRocksDBConfig.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


Where the rocksdb config is a fix for 1-core machines:

public static class CustomRocksDBConfig implements RocksDBConfigSetter {

  @Override
  public void setConfig(
      final String storeName, final Options options, final Map<String,
Object> configs) {
    // Workaround: We must ensure that the parallelism is set to >= 2.
    int compactionParallelism =
Math.max(Runtime.getRuntime().availableProcessors(), 2);
    // Set number of compaction threads (but not flush threads).
    options.setIncreaseParallelism(compactionParallelism);
  }
}

Again, thanks for your help,
Johan

On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Johan,
>
> Could you list the following information in addition to your topology?
>
> 1. Your config settings if you have any overwrites, especially consumer's "
> poll.ms" and "max.poll.records", and "num.threads".
>
> 2. Your expected incoming data rate (messages per sec) at normal processing
> phase.
>
>
>
> Guozhang
>
>
> On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg <johan.genb...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thank you for responding so quickly. This is the topology. I've
> simplified
> > it a bit, but these are the steps it goes through, not sure if that is
> > helpful. I'll try to get some logs too in a bit.
> >
> > *KStream<Integer, Event> eventStream = builder.stream(*
> > *  topic.keySerde(),*
> > *  topic.valueSerde(),*
> > *  topic.name <http://topic.name>());*
> >
> > *KStream<Integer, Event> enriched =*
> > *    eventStream*
> > *        .map(...)*
> > *        .transformValues(MyStatefulProcessor::new, "store1")*
> > *        .mapValues(new MyStatelessProcessor());*
> > *        .through("topic")*
> > *        .map(new MyStatelessProcessor2());*
> > *        .through("topic2")*
> > *        .transformValues(MyStatefulProcessor2::new, "store2")*
> > *        .through("topic3")*
> > *        .map(new MyStatelessProcessor3());*
> > *        .through("topic4");*
> >
> > Store 1:
> >
> > *Stores.create("store1")*
> > *    .withKeys(Serdes.String())*
> > *    .withValues(Serdes.Long())*
> > *    .inMemory()*
> > *    .build();*
> >
> > Store 2:
> >
> > *Stores.create("store2")*
> > *    .withKeys(Serdes.String())*
> > *    .withValues(valueSerde) // a small json object, 4-5 properties*
> > *    .persistent()*
> > *    .build();*
> >
> > Thanks,
> > Johan
> >
> > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <damian....@gmail.com> wrote:
> >
> > > Hi Johan,
> > >
> > > Do you have any logs? The state store restoration changed significantly
> > in
> > > 0.11.0.1. If you could get some logs at trace level, that would be
> > useful.
> > > Also if you could provide your topology (removing anything
> > > proprietary/sensitive).
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg <johan.genb...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm upgrading a kafka streams application from 0.10.2.1 to 0.11.0.1,
> > > > running against a kafka cluster with version 0.10.2.1. The
> application
> > > uses
> > > > a couple of state stores.
> > > >
> > > > When stopping/starting the application prior to the upgrade (with
> > > 0.10.2.1
> > > > client) on 2 instances, it was up and running in less than 30s to a
> > > minute
> > > > on all nodes. However, after client was upgraded to 0.11.0.1, when
> the
> > > > instances started (during some load), it took about 6 minutes for one
> > of
> > > > the nodes to reach "RUNNING" state, and the second one didn't get
> > there.
> > > > After 10 minutes I had to roll back.
> > > >
> > > > Is it expected that this initial rebalancing takes a little longer
> with
> > > > 0.11.0.1, and is there a way to configure or tweak the client, or
> > > otherwise
> > > > optimize this to make this go faster?
> > > >
> > > > Thanks,
> > > > Johan
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to