Given your application code: ----------------------------
final KStream<String, NodeMutation> localDeltaStream = builder.stream( localDeltaTopic, Consumed.with( new Serdes.StringSerde(), new NodeMutationSerde<>() ) ); KStream<String, NodeState> localHistStream = localDeltaStream.mapValues( (mutation) -> NodeState .newBuilder() .setMeta( mutation.getMetaMutation().getMeta() ) .setValue( mutation.getValueMutation().getValue() ) .build() ); localHistStream.to( localHistTopic, Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>()) ); ---------------------------- which is pure stateless, committing will not touch on an state directory at all. Hence committing only involves committing offsets to Kafka. Guozhang On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote: > I was suspecting that too, but I also noticed the spike is not spaced > around 10s. to further prove it. I put kafka data directory in a memory > based directory. it still has such latency spikes. I am going to test it > on a single broker, single partition env. will report back soon. > > On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Nan, > > > > Thanks for the detailed information you shared. When Kafka Streams is > > normally running, no rebalances should be triggered unless some of the > > instances (in your case, docker containers) have soft failures. > > > > I suspect the latency spike is due to the commit intervals: streams will > > try to commit its offset at a regular paces, which may increase latency. > It > > is controlled by the "commit.interval.ms" config value. I saw that in > your > > original email you've set it to 10 * 1000 (10 seconds). Is that aligned > > with the frequency you observe latency spikes? > > > > > > Guozhang > > > > > > On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> wrote: > > > > > did more test and and make the test case simple. > > > all the setup now is a single physical machine. running 3 docker > > instance. > > > a1, a2, a3 > > > > > > kafka + zookeeper running on all of those docker containers. > > > producer running on a1, send a single key, update speed 2000 > message/s, > > > each message is 10K size. > > > 3 consumer(different group) are running. one on each docker. > > > all topics are pre-created. > > > in startup, I do see some latency greater than 100ms, which is fine. > and > > > then everything is good. latency is low and consumer don't see anything > > > over 100ms for a while. > > > then I see a few messages have latency over 100ms. then back to normal, > > > then happen again..... do seems like gc problem. but I check the gc > > log. I > > > don't think it can cause over 100ms. (both are G1 collector) > > > > > > after the stream stable running( exclude the startup), the first > message > > > over 100ms take 179ms and the gc ( it has a 30ms pause, but should not > > > cause a 179ms end to end). > > > > > > FROM APP > > > > > > 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure) > > > 3184739K->84018K(5947904K), 0.0093730 secs] > > > > > > 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure) > > > 3184690K->84280K(6053888K), 0.0087473 secs] > > > > > > 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure) > > > 3301176K->84342K(6061056K), 0.0127339 secs] > > > > > > 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure) > > > 3301238K->84624K(6143488K), 0.0140844 secs] > > > > > > 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure) > > > 3386000K->89949K(6144000K), 0.0108118 secs] > > > > > > > > > > > > kafka a1 > > > > > > 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation Pause) > > > (young), 0.0214200 secs] > > > > > > [Parallel Time: 17.2 ms, GC Workers: 8] > > > > > > [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max: > > > 7982673.8, Diff: 16.3] > > > > > > [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff: 1.5, > > > Sum: 1.5] > > > > > > [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum: > 8.4] > > > > > > [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13, Sum: > > 37] > > > > > > [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum: 7.1] > > > > > > [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: > 0.0, > > > Sum: 0.0] > > > > > > [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5, Sum: > > > 36.5] > > > > > > [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9, Sum: > > 2.9] > > > > > > [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24, > > Sum: > > > 83] > > > > > > [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, > > Sum: > > > 0.1] > > > > > > [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff: 16.2, > > > Sum: 56.5] > > > > > > [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max: > > 7982674.5, > > > Diff: 0.6] > > > > > > [Code Root Fixup: 0.0 ms] > > > > > > [Code Root Purge: 0.0 ms] > > > > > > [Clear CT: 1.0 ms] > > > > > > [Other: 3.2 ms] > > > > > > [Choose CSet: 0.0 ms] > > > > > > [Ref Proc: 1.9 ms] > > > > > > [Ref Enq: 0.0 ms] > > > > > > [Redirty Cards: 0.8 ms] > > > > > > [Humongous Register: 0.1 ms] > > > > > > [Humongous Reclaim: 0.0 ms] > > > > > > [Free CSet: 0.2 ms] > > > > > > [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap: > > > 265.5M(1024.0M)->217.9M(1024.0M)] > > > > > > [Times: user=0.05 sys=0.00, real=0.03 secs] > > > > > > 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation Pause) > > > (young), 0.0310004 secs] > > > > > > [Parallel Time: 24.4 ms, GC Workers: 8] > > > > > > [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max: > > > 7984444.7, Diff: 18.6] > > > > > > [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff: 1.9, > > > Sum: 2.0] > > > > > > [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8, Sum: > > > 32.9] > > > > > > [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25, Sum: > > 43] > > > > > > [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum: > > 25.5] > > > > > > [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: > 0.0, > > > Sum: 0.0] > > > > > > [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9, Sum: > > > 32.7] > > > > > > [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6, Sum: > > 6.8] > > > > > > [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10, > Sum: > > > 43] > > > > > > [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, > > Sum: > > > 0.1] > > > > > > [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff: > 19.1, > > > Sum: 100.1] > > > > > > [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max: > > 7984449.9, > > > Diff: 0.8] > > > > > > [Code Root Fixup: 0.0 ms] > > > > > > [Code Root Purge: 0.0 ms] > > > > > > [Clear CT: 1.1 ms] > > > > > > [Other: 5.5 ms] > > > > > > [Choose CSet: 0.0 ms] > > > > > > [Ref Proc: 2.2 ms] > > > > > > [Ref Enq: 0.0 ms] > > > > > > [Redirty Cards: 2.8 ms] > > > > > > [Humongous Register: 0.1 ms] > > > > > > [Humongous Reclaim: 0.0 ms] > > > > > > [Free CSet: 0.1 ms] > > > > > > [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap: > > > 265.9M(1024.0M)->218.4M(1024.0M)] > > > > > > [Times: user=0.05 sys=0.00, real=0.03 secs] > > > > > > > > > so when kafka stream running, is there any trying to rebalance? either > > > broker rebalance or client rebalance? > > > any kind of test to see what cause the trouble? > > > > > > Thanks, > > > Nan > > > > > > > > > > > > > > > > > > > > > On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Okay, so you're measuring end-to-end time from producer -> broker -> > > > > streams' consumer client, there are multiple phases that can > contribute > > > to > > > > the 100ms latency, and I cannot tell if stream's consumer phase is > the > > > > major contributor. For example, if the topic was not created before, > > then > > > > when the broker first received a produce request it may need to > create > > > the > > > > topic, which involves multiple steps including writes to ZK which > could > > > > take time. > > > > > > > > There are some confusions from your description: you mentioned "Kafka > > > > cluster is already up and running", but I think you are referring to > > > "Kafka > > > > Streams application instances are already up and running", right? > Since > > > > only the latter has rebalance process, while the Kafak brokers do not > > > > really have "rebalances" except balancing load by migrating > partitions. > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com> wrote: > > > > > > > > > right, so my kafka cluster is already up and running for a while, > > and I > > > > can > > > > > see from the log all broker instance already change from rebalance > to > > > > > running. > > > > > > > > > > I did a another test. > > > > > from producer, right before the message get send to the broker, I > > put a > > > > > timestamp in the message. and from the consumer side which is after > > > > stream > > > > > processing, I compare this timestamp with current time. I can see > > some > > > > > message processing time is above 100ms on some real powerful > > hardware. > > > > and > > > > > from my application gc, all the gc time is below 1ms, kafka gc only > > > > happen > > > > > once and below 1ms too. > > > > > > > > > > very puzzled. is there any communication to zookeeper, if not get > > > > response, > > > > > will cause the broker to pause? I don't think that's the case but > at > > > this > > > > > time don't know what else can be suspected. > > > > > > > > > > Nan > > > > > > > > > > On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > Hello Nan, > > > > > > > > > > > > Note that Streams may need some time to rebalance and assign > tasks > > > even > > > > > if > > > > > > you only starts with one instance. > > > > > > > > > > > > I'd suggest you register your state listener in Kafka Streams via > > > > > > KafkaStreams#setStateListener, and your customized StateListener > > > should > > > > > > record when the state transits from REBALANCING to RUNNING since > > only > > > > > after > > > > > > that the streams client will start to process the first record. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com> > > wrote: > > > > > > > > > > > > > thanks, which JMX properties indicate "processing latency > > > spikes" / > > > > > > > "throughput" > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax < > > > > matth...@confluent.io > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > I cannot spot any obvious reasons. > > > > > > > > > > > > > > > > As you consume from the result topic for verification, we > > should > > > > > verify > > > > > > > > that the latency spikes original on write and not on read: > you > > > > might > > > > > > > > want to have a look into Kafka Streams JMX metric to see if > > > > > processing > > > > > > > > latency spikes or throughput drops. > > > > > > > > > > > > > > > > Also watch for GC pauses in the JVM. > > > > > > > > > > > > > > > > Hope this helps. > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 8/17/18 12:13 PM, Nan Xu wrote: > > > > > > > > > btw, I am using version 0.10.2.0 > > > > > > > > > > > > > > > > > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu < > nanxu1...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > >> I am working on a kafka stream app, and see huge latency > > > > variance, > > > > > > > > >> wondering what can cause this? > > > > > > > > >> > > > > > > > > >> the processing is very simple and don't have state, > > linger.ms > > > > > > already > > > > > > > > >> change to 5ms. the message size is around 10K byes and > > > published > > > > > as > > > > > > > 2000 > > > > > > > > >> messages/s, network is 10G. using a regular consumer > watch > > > the > > > > > > > > >> localHistTopic topic and just every 2000 message print > out > > a > > > > > > counter, > > > > > > > > it > > > > > > > > >> usually every second I get a count 2000 as the publish > > speed, > > > > but > > > > > > > > sometime > > > > > > > > >> I see it stall for 3 or more seconds and then print out a > > few > > > > > count. > > > > > > > > like > > > > > > > > >> cpu is paused during that time or message being > cache/batch > > > then > > > > > > > > processed. > > > > > > > > >> any suggestion? > > > > > > > > >> > > > > > > > > >> final Properties streamsConfiguration = new > Properties(); > > > > > > > > >> > > > > > > > > >> > > > > > > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > > > > > > > > >> applicationId); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_ > > > CONFIG, > > > > > > > > clientId); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_ > > > > > > > SERVERS_CONFIG, > > > > > > > > >> bootstrapServers); > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_ > > > > > > > SERDE_CLASS_CONFIG, > > > > > > > > >> Serdes.String() > > > > > > > > >> > > > > > > > > >> .getClass().getName()); > > > > > > > > >> > > > > > > > > >> > > > > > > > > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ > > > MS_CONFIG, > > > > > > > > >> 10 * 1000); > > > > > > > > >> > > > > > > > > >> // > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > > > > > > > 0); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.PRODUCER_PREFIX > > > > > > > > >> > > > > > > > > >> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.PRODUCER_PREFIX > > > > > > > > >> > > > > > > > > >> + ProducerConfig.BUFFER_MEMORY_CONFIG, > > > 335544320); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.PRODUCER_PREFIX > > > > > > > > >> > > > > > > > > >> + ProducerConfig.MAX_IN_FLIGHT_ > > > > > REQUESTS_PER_CONNECTION, > > > > > > > 30); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.PRODUCER_PREFIX > > > > > > > > >> > > > > > > > > >> + ProducerConfig.LINGER_MS_CONFIG,"5"); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.consumerPrefix( > > > > > > > > >> ConsumerConfig.MAX_PARTITION_ > > > FETCH_BYTES_CONFIG),20 > > > > * > > > > > > > 1024 * > > > > > > > > >> 1024);MS_CONFIG, 10 * 1000); > > > > > > > > >> > > > > > > > > >> // > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > > > > > > > 0); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.PRODUCER_PREFIX > > > > > > > > >> > > > > > > > > >> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.PRODUCER_PREFIX > > > > > > > > >> > > > > > > > > >> + ProducerConfig.BUFFER_MEMORY_CONFIG, > > > 335544320); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.PRODUCER_PREFIX > > > > > > > > >> > > > > > > > > >> + ProducerConfig.MAX_IN_FLIGHT_ > > > > > REQUESTS_PER_CONNECTION, > > > > > > > 30); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.PRODUCER_PREFIX > > > > > > > > >> > > > > > > > > >> + ProducerConfig.LINGER_MS_CONFIG,"5"); > > > > > > > > >> > > > > > > > > >> streamsConfiguration.put( > > > StreamsConfig.consumerPrefix( > > > > > > > > >> > > > > > > > > >> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_ > CONFIG > > > , > > > > > 20 * > > > > > > > > 1024 * > > > > > > > > >> 1024); > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> final StreamsBuilder builder = new StreamsBuilder(); > > > > > > > > >> > > > > > > > > >> final KStream<String, NodeMutation> localDeltaStream = > > > > > > > builder.stream( > > > > > > > > >> > > > > > > > > >> localDeltaTopic, > > > > > > > > >> > > > > > > > > >> Consumed.with( > > > > > > > > >> > > > > > > > > >> new Serdes.StringSerde(), > > > > > > > > >> > > > > > > > > >> new NodeMutationSerde<>() > > > > > > > > >> > > > > > > > > >> ) > > > > > > > > >> > > > > > > > > >> ); > > > > > > > > >> > > > > > > > > >> KStream<String, NodeState> localHistStream = > > > > > > > > localDeltaStream.mapValues( > > > > > > > > >> > > > > > > > > >> (mutation) -> NodeState > > > > > > > > >> > > > > > > > > >> .newBuilder() > > > > > > > > >> > > > > > > > > >> .setMeta( > > > > > > > > >> > > > > > > > > >> mutation.getMetaMutation().getMeta() > > > > > > > > >> > > > > > > > > >> ) > > > > > > > > >> > > > > > > > > >> .setValue( > > > > > > > > >> > > > > > > > > >> mutation.getValueMutation(). > getValue() > > > > > > > > >> > > > > > > > > >> ) > > > > > > > > >> > > > > > > > > >> .build() > > > > > > > > >> > > > > > > > > >> ); > > > > > > > > >> > > > > > > > > >> localHistStream.to( > > > > > > > > >> > > > > > > > > >> localHistTopic, > > > > > > > > >> > > > > > > > > >> Produced.with(new Serdes.StringSerde(), new > > > > > > > > NodeStateSerde<>()) > > > > > > > > >> > > > > > > > > >> ); > > > > > > > > >> > > > > > > > > >> streams = new KafkaStreams(builder.build(), > > > > > streamsConfiguration); > > > > > > > > >> > > > > > > > > >> streams.cleanUp(); > > > > > > > > >> > > > > > > > > >> streams.start(); > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang