Ok, I will try that. Is there any documentation as to what exactly this does, in general? Just want to safe guard against side effects. Also, do you recommend 0 as a production setting (default is 100ms, I think), or is this just to test this out/diagnose?
The only description I can find is this: *"The amount of time in milliseconds to block waiting for input."* Thanks Johan On Tue, Oct 17, 2017 at 1:32 PM Guozhang Wang <wangg...@gmail.com> wrote: > Johan, > > Could you please try change the following configs and try again? > > properties.put(StreamsConfig.POLL_MS_CONFIG, 0); > > > Guozhang > > > On Tue, Oct 17, 2017 at 12:34 PM, Johan Genberg <johan.genb...@gmail.com> > wrote: > > > Hi, > > > > The trace log is here (tried to attach to email but it was rejected): > > > > https://gist.github.com/trickleup/fe0c095df65b8ae10906ea6774804b54 > > > > The logs are from a test environment, but I was able to somewhat > reproduce > > the issue, and with 5 or so messages per second on average. The > production > > system where the lag was greater has a similar load, but more > pre-existing > > entries in the key-value stores. > > > > I see a lot of "Skipping fetch for partition my_application_enriched-3 > > because there is an in-flight request...", not sure if this is expected. > > > > These are the consumer config overrides: > > > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); > > properties.put(StreamsConfig.POLL_MS_CONFIG, 1000); > > properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > > CustomRocksDBConfig.class); > > properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > > > > > Where the rocksdb config is a fix for 1-core machines: > > > > public static class CustomRocksDBConfig implements RocksDBConfigSetter { > > > > @Override > > public void setConfig( > > final String storeName, final Options options, final Map<String, > > Object> configs) { > > // Workaround: We must ensure that the parallelism is set to >= 2. > > int compactionParallelism = > > Math.max(Runtime.getRuntime().availableProcessors(), 2); > > // Set number of compaction threads (but not flush threads). > > options.setIncreaseParallelism(compactionParallelism); > > } > > } > > > > Again, thanks for your help, > > Johan > > > > On Tue, Oct 17, 2017 at 11:57 AM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Johan, > > > > > > Could you list the following information in addition to your topology? > > > > > > 1. Your config settings if you have any overwrites, especially > > consumer's " > > > poll.ms" and "max.poll.records", and "num.threads". > > > > > > 2. Your expected incoming data rate (messages per sec) at normal > > processing > > > phase. > > > > > > > > > > > > Guozhang > > > > > > > > > On Tue, Oct 17, 2017 at 9:12 AM, Johan Genberg < > johan.genb...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > Thank you for responding so quickly. This is the topology. I've > > > simplified > > > > it a bit, but these are the steps it goes through, not sure if that > is > > > > helpful. I'll try to get some logs too in a bit. > > > > > > > > *KStream<Integer, Event> eventStream = builder.stream(* > > > > * topic.keySerde(),* > > > > * topic.valueSerde(),* > > > > * topic.name <http://topic.name>());* > > > > > > > > *KStream<Integer, Event> enriched =* > > > > * eventStream* > > > > * .map(...)* > > > > * .transformValues(MyStatefulProcessor::new, "store1")* > > > > * .mapValues(new MyStatelessProcessor());* > > > > * .through("topic")* > > > > * .map(new MyStatelessProcessor2());* > > > > * .through("topic2")* > > > > * .transformValues(MyStatefulProcessor2::new, "store2")* > > > > * .through("topic3")* > > > > * .map(new MyStatelessProcessor3());* > > > > * .through("topic4");* > > > > > > > > Store 1: > > > > > > > > *Stores.create("store1")* > > > > * .withKeys(Serdes.String())* > > > > * .withValues(Serdes.Long())* > > > > * .inMemory()* > > > > * .build();* > > > > > > > > Store 2: > > > > > > > > *Stores.create("store2")* > > > > * .withKeys(Serdes.String())* > > > > * .withValues(valueSerde) // a small json object, 4-5 properties* > > > > * .persistent()* > > > > * .build();* > > > > > > > > Thanks, > > > > Johan > > > > > > > > On Tue, Oct 17, 2017 at 2:51 AM Damian Guy <damian....@gmail.com> > > wrote: > > > > > > > > > Hi Johan, > > > > > > > > > > Do you have any logs? The state store restoration changed > > significantly > > > > in > > > > > 0.11.0.1. If you could get some logs at trace level, that would be > > > > useful. > > > > > Also if you could provide your topology (removing anything > > > > > proprietary/sensitive). > > > > > > > > > > Thanks, > > > > > Damian > > > > > > > > > > On Tue, 17 Oct 2017 at 05:55 Johan Genberg < > johan.genb...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I'm upgrading a kafka streams application from 0.10.2.1 to > > 0.11.0.1, > > > > > > running against a kafka cluster with version 0.10.2.1. The > > > application > > > > > uses > > > > > > a couple of state stores. > > > > > > > > > > > > When stopping/starting the application prior to the upgrade (with > > > > > 0.10.2.1 > > > > > > client) on 2 instances, it was up and running in less than 30s > to a > > > > > minute > > > > > > on all nodes. However, after client was upgraded to 0.11.0.1, > when > > > the > > > > > > instances started (during some load), it took about 6 minutes for > > one > > > > of > > > > > > the nodes to reach "RUNNING" state, and the second one didn't get > > > > there. > > > > > > After 10 minutes I had to roll back. > > > > > > > > > > > > Is it expected that this initial rebalancing takes a little > longer > > > with > > > > > > 0.11.0.1, and is there a way to configure or tweak the client, or > > > > > otherwise > > > > > > optimize this to make this go faster? > > > > > > > > > > > > Thanks, > > > > > > Johan > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >