Hello Nan, Kafka does not tie up the processing thread to do disk flushing. However, since you are on an older version of Kafka I suspect you're bumping into some old issues that have been resolved in later versions. e.g.
https://issues.apache.org/jira/browse/KAFKA-4614 I'd suggest you upgrading to latest version (2.0.0) and try again to see if you observe the same pattern. Guozhang On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni < sbpothin...@gmail.com> wrote: > I will wait for the expert’s opinion: > > Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s a > Linux kernel parameter. > > -Sudhir > > > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote: > > > > I think I found where the problem is, how to solve and why, still not > sure. > > > > it related to disk (maybe flushing?). I did a single machine, single > node, > > single topic and single partition setup. producer pub as 2000 message/s, > > 10K size message size. and single key. > > > > when I save kafka log to the memory based partition, I don't see a > latency > > over 100ms. top around 70ms. > > when I save to a ssd hard drive. I do see latency spike, sometime over > 1s. > > > > adjust the log.flush.inteval.message / log.flush.intefval.ms has impact, > > but only to make thing worse... need suggestion. > > > > I think log flushing is totally async and done by OS in the default > > setting. does kafka has to wait when flushing data to disk? > > > > Thanks, > > Nan > > > > > > > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >> Given your application code: > >> > >> ---------------------------- > >> > >> final KStream<String, NodeMutation> localDeltaStream = builder.stream( > >> > >> localDeltaTopic, > >> > >> Consumed.with( > >> > >> new Serdes.StringSerde(), > >> > >> new NodeMutationSerde<>() > >> > >> ) > >> > >> ); > >> > >> KStream<String, NodeState> localHistStream = > localDeltaStream.mapValues( > >> > >> (mutation) -> NodeState > >> > >> .newBuilder() > >> > >> .setMeta( > >> > >> mutation.getMetaMutation().getMeta() > >> > >> ) > >> > >> .setValue( > >> > >> mutation.getValueMutation().getValue() > >> > >> ) > >> > >> .build() > >> > >> ); > >> > >> localHistStream.to( > >> > >> localHistTopic, > >> > >> Produced.with(new Serdes.StringSerde(), new > NodeStateSerde<>()) > >> > >> ); > >> > >> ---------------------------- > >> > >> which is pure stateless, committing will not touch on an state > directory at > >> all. Hence committing only involves committing offsets to Kafka. > >> > >> > >> Guozhang > >> > >> > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote: > >>> > >>> I was suspecting that too, but I also noticed the spike is not spaced > >>> around 10s. to further prove it. I put kafka data directory in a memory > >>> based directory. it still has such latency spikes. I am going to test > >> it > >>> on a single broker, single partition env. will report back soon. > >>> > >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>> > >>>> Hello Nan, > >>>> > >>>> Thanks for the detailed information you shared. When Kafka Streams is > >>>> normally running, no rebalances should be triggered unless some of the > >>>> instances (in your case, docker containers) have soft failures. > >>>> > >>>> I suspect the latency spike is due to the commit intervals: streams > >> will > >>>> try to commit its offset at a regular paces, which may increase > >> latency. > >>> It > >>>> is controlled by the "commit.interval.ms" config value. I saw that in > >>> your > >>>> original email you've set it to 10 * 1000 (10 seconds). Is that > aligned > >>>> with the frequency you observe latency spikes? > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> > wrote: > >>>>> > >>>>> did more test and and make the test case simple. > >>>>> all the setup now is a single physical machine. running 3 docker > >>>> instance. > >>>>> a1, a2, a3 > >>>>> > >>>>> kafka + zookeeper running on all of those docker containers. > >>>>> producer running on a1, send a single key, update speed 2000 > >>> message/s, > >>>>> each message is 10K size. > >>>>> 3 consumer(different group) are running. one on each docker. > >>>>> all topics are pre-created. > >>>>> in startup, I do see some latency greater than 100ms, which is fine. > >>> and > >>>>> then everything is good. latency is low and consumer don't see > >> anything > >>>>> over 100ms for a while. > >>>>> then I see a few messages have latency over 100ms. then back to > >> normal, > >>>>> then happen again..... do seems like gc problem. but I check the gc > >>>> log. I > >>>>> don't think it can cause over 100ms. (both are G1 collector) > >>>>> > >>>>> after the stream stable running( exclude the startup), the first > >>> message > >>>>> over 100ms take 179ms and the gc ( it has a 30ms pause, but should > >> not > >>>>> cause a 179ms end to end). > >>>>> > >>>>> FROM APP > >>>>> > >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure) > >>>>> 3184739K->84018K(5947904K), 0.0093730 secs] > >>>>> > >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure) > >>>>> 3184690K->84280K(6053888K), 0.0087473 secs] > >>>>> > >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure) > >>>>> 3301176K->84342K(6061056K), 0.0127339 secs] > >>>>> > >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure) > >>>>> 3301238K->84624K(6143488K), 0.0140844 secs] > >>>>> > >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure) > >>>>> 3386000K->89949K(6144000K), 0.0108118 secs] > >>>>> > >>>>> > >>>>> > >>>>> kafka a1 > >>>>> > >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation > >> Pause) > >>>>> (young), 0.0214200 secs] > >>>>> > >>>>> [Parallel Time: 17.2 ms, GC Workers: 8] > >>>>> > >>>>> [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max: > >>>>> 7982673.8, Diff: 16.3] > >>>>> > >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff: > >> 1.5, > >>>>> Sum: 1.5] > >>>>> > >>>>> [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum: > >>> 8.4] > >>>>> > >>>>> [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13, > >> Sum: > >>>> 37] > >>>>> > >>>>> [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum: > >> 7.1] > >>>>> > >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: > >>> 0.0, > >>>>> Sum: 0.0] > >>>>> > >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5, > >> Sum: > >>>>> 36.5] > >>>>> > >>>>> [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9, > >> Sum: > >>>> 2.9] > >>>>> > >>>>> [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24, > >>>> Sum: > >>>>> 83] > >>>>> > >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, > >>>> Sum: > >>>>> 0.1] > >>>>> > >>>>> [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff: > >> 16.2, > >>>>> Sum: 56.5] > >>>>> > >>>>> [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max: > >>>> 7982674.5, > >>>>> Diff: 0.6] > >>>>> > >>>>> [Code Root Fixup: 0.0 ms] > >>>>> > >>>>> [Code Root Purge: 0.0 ms] > >>>>> > >>>>> [Clear CT: 1.0 ms] > >>>>> > >>>>> [Other: 3.2 ms] > >>>>> > >>>>> [Choose CSet: 0.0 ms] > >>>>> > >>>>> [Ref Proc: 1.9 ms] > >>>>> > >>>>> [Ref Enq: 0.0 ms] > >>>>> > >>>>> [Redirty Cards: 0.8 ms] > >>>>> > >>>>> [Humongous Register: 0.1 ms] > >>>>> > >>>>> [Humongous Reclaim: 0.0 ms] > >>>>> > >>>>> [Free CSet: 0.2 ms] > >>>>> > >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap: > >>>>> 265.5M(1024.0M)->217.9M(1024.0M)] > >>>>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] > >>>>> > >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation > >> Pause) > >>>>> (young), 0.0310004 secs] > >>>>> > >>>>> [Parallel Time: 24.4 ms, GC Workers: 8] > >>>>> > >>>>> [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max: > >>>>> 7984444.7, Diff: 18.6] > >>>>> > >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff: > >> 1.9, > >>>>> Sum: 2.0] > >>>>> > >>>>> [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8, > >> Sum: > >>>>> 32.9] > >>>>> > >>>>> [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25, > >> Sum: > >>>> 43] > >>>>> > >>>>> [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum: > >>>> 25.5] > >>>>> > >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: > >>> 0.0, > >>>>> Sum: 0.0] > >>>>> > >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9, > >> Sum: > >>>>> 32.7] > >>>>> > >>>>> [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6, > >> Sum: > >>>> 6.8] > >>>>> > >>>>> [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10, > >>> Sum: > >>>>> 43] > >>>>> > >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, > >>>> Sum: > >>>>> 0.1] > >>>>> > >>>>> [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff: > >>> 19.1, > >>>>> Sum: 100.1] > >>>>> > >>>>> [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max: > >>>> 7984449.9, > >>>>> Diff: 0.8] > >>>>> > >>>>> [Code Root Fixup: 0.0 ms] > >>>>> > >>>>> [Code Root Purge: 0.0 ms] > >>>>> > >>>>> [Clear CT: 1.1 ms] > >>>>> > >>>>> [Other: 5.5 ms] > >>>>> > >>>>> [Choose CSet: 0.0 ms] > >>>>> > >>>>> [Ref Proc: 2.2 ms] > >>>>> > >>>>> [Ref Enq: 0.0 ms] > >>>>> > >>>>> [Redirty Cards: 2.8 ms] > >>>>> > >>>>> [Humongous Register: 0.1 ms] > >>>>> > >>>>> [Humongous Reclaim: 0.0 ms] > >>>>> > >>>>> [Free CSet: 0.1 ms] > >>>>> > >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap: > >>>>> 265.9M(1024.0M)->218.4M(1024.0M)] > >>>>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] > >>>>> > >>>>> > >>>>> so when kafka stream running, is there any trying to rebalance? > >> either > >>>>> broker rebalance or client rebalance? > >>>>> any kind of test to see what cause the trouble? > >>>>> > >>>>> Thanks, > >>>>> Nan > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Okay, so you're measuring end-to-end time from producer -> broker > >> -> > >>>>>> streams' consumer client, there are multiple phases that can > >>> contribute > >>>>> to > >>>>>> the 100ms latency, and I cannot tell if stream's consumer phase is > >>> the > >>>>>> major contributor. For example, if the topic was not created > >> before, > >>>> then > >>>>>> when the broker first received a produce request it may need to > >>> create > >>>>> the > >>>>>> topic, which involves multiple steps including writes to ZK which > >>> could > >>>>>> take time. > >>>>>> > >>>>>> There are some confusions from your description: you mentioned > >> "Kafka > >>>>>> cluster is already up and running", but I think you are referring > >> to > >>>>> "Kafka > >>>>>> Streams application instances are already up and running", right? > >>> Since > >>>>>> only the latter has rebalance process, while the Kafak brokers do > >> not > >>>>>> really have "rebalances" except balancing load by migrating > >>> partitions. > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com> > >> wrote: > >>>>>> > >>>>>>> right, so my kafka cluster is already up and running for a while, > >>>> and I > >>>>>> can > >>>>>>> see from the log all broker instance already change from > >> rebalance > >>> to > >>>>>>> running. > >>>>>>> > >>>>>>> I did a another test. > >>>>>>> from producer, right before the message get send to the broker, I > >>>> put a > >>>>>>> timestamp in the message. and from the consumer side which is > >> after > >>>>>> stream > >>>>>>> processing, I compare this timestamp with current time. I can see > >>>> some > >>>>>>> message processing time is above 100ms on some real powerful > >>>> hardware. > >>>>>> and > >>>>>>> from my application gc, all the gc time is below 1ms, kafka gc > >> only > >>>>>> happen > >>>>>>> once and below 1ms too. > >>>>>>> > >>>>>>> very puzzled. is there any communication to zookeeper, if not get > >>>>>> response, > >>>>>>> will cause the broker to pause? I don't think that's the case but > >>> at > >>>>> this > >>>>>>> time don't know what else can be suspected. > >>>>>>> > >>>>>>> Nan > >>>>>>> > >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang < > >> wangg...@gmail.com> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hello Nan, > >>>>>>>> > >>>>>>>> Note that Streams may need some time to rebalance and assign > >>> tasks > >>>>> even > >>>>>>> if > >>>>>>>> you only starts with one instance. > >>>>>>>> > >>>>>>>> I'd suggest you register your state listener in Kafka Streams > >> via > >>>>>>>> KafkaStreams#setStateListener, and your customized > >> StateListener > >>>>> should > >>>>>>>> record when the state transits from REBALANCING to RUNNING > >> since > >>>> only > >>>>>>> after > >>>>>>>> that the streams client will start to process the first record. > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com> > >>>> wrote: > >>>>>>>> > >>>>>>>>> thanks, which JMX properties indicate "processing latency > >>>>> spikes" / > >>>>>>>>> "throughput" > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax < > >>>>>> matth...@confluent.io > >>>>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> I cannot spot any obvious reasons. > >>>>>>>>>> > >>>>>>>>>> As you consume from the result topic for verification, we > >>>> should > >>>>>>> verify > >>>>>>>>>> that the latency spikes original on write and not on read: > >>> you > >>>>>> might > >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if > >>>>>>> processing > >>>>>>>>>> latency spikes or throughput drops. > >>>>>>>>>> > >>>>>>>>>> Also watch for GC pauses in the JVM. > >>>>>>>>>> > >>>>>>>>>> Hope this helps. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote: > >>>>>>>>>>> btw, I am using version 0.10.2.0 > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu < > >>> nanxu1...@gmail.com> > >>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I am working on a kafka stream app, and see huge latency > >>>>>> variance, > >>>>>>>>>>>> wondering what can cause this? > >>>>>>>>>>>> > >>>>>>>>>>>> the processing is very simple and don't have state, > >>>> linger.ms > >>>>>>>> already > >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and > >>>>> published > >>>>>>> as > >>>>>>>>> 2000 > >>>>>>>>>>>> messages/s, network is 10G. using a regular consumer > >>> watch > >>>>> the > >>>>>>>>>>>> localHistTopic topic and just every 2000 message print > >>> out > >>>> a > >>>>>>>> counter, > >>>>>>>>>> it > >>>>>>>>>>>> usually every second I get a count 2000 as the publish > >>>> speed, > >>>>>> but > >>>>>>>>>> sometime > >>>>>>>>>>>> I see it stall for 3 or more seconds and then print out > >> a > >>>> few > >>>>>>> count. > >>>>>>>>>> like > >>>>>>>>>>>> cpu is paused during that time or message being > >>> cache/batch > >>>>> then > >>>>>>>>>> processed. > >>>>>>>>>>>> any suggestion? > >>>>>>>>>>>> > >>>>>>>>>>>> final Properties streamsConfiguration = new > >>> Properties(); > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > >>>>>>>>>>>> applicationId); > >>>>>>>>>>>> > >>>>>>>>>>>> > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_ > >>>>> CONFIG, > >>>>>>>>>> clientId); > >>>>>>>>>>>> > >>>>>>>>>>>> > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_ > >>>>>>>>> SERVERS_CONFIG, > >>>>>>>>>>>> bootstrapServers); > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_ > >>>>>>>>> SERDE_CLASS_CONFIG, > >>>>>>>>>>>> Serdes.String() > >>>>>>>>>>>> > >>>>>>>>>>>> .getClass().getName()); > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ > >>>>> MS_CONFIG, > >>>>>>>>>>>> 10 * 1000); > >>>>>>>>>>>> > >>>>>>>>>>>> // > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > BUFFERING_CONFIG, > >>>>>>>>> 0); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.PRODUCER_PREFIX > >>>>>>>>>>>> > >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.PRODUCER_PREFIX > >>>>>>>>>>>> > >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, > >>>>> 335544320); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.PRODUCER_PREFIX > >>>>>>>>>>>> > >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ > >>>>>>> REQUESTS_PER_CONNECTION, > >>>>>>>>> 30); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.PRODUCER_PREFIX > >>>>>>>>>>>> > >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.consumerPrefix( > >>>>>>>>>>>> ConsumerConfig.MAX_PARTITION_ > >>>>> FETCH_BYTES_CONFIG),20 > >>>>>> * > >>>>>>>>> 1024 * > >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000); > >>>>>>>>>>>> > >>>>>>>>>>>> // > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > BUFFERING_CONFIG, > >>>>>>>>> 0); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.PRODUCER_PREFIX > >>>>>>>>>>>> > >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.PRODUCER_PREFIX > >>>>>>>>>>>> > >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, > >>>>> 335544320); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.PRODUCER_PREFIX > >>>>>>>>>>>> > >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ > >>>>>>> REQUESTS_PER_CONNECTION, > >>>>>>>>> 30); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.PRODUCER_PREFIX > >>>>>>>>>>>> > >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); > >>>>>>>>>>>> > >>>>>>>>>>>> streamsConfiguration.put( > >>>>> StreamsConfig.consumerPrefix( > >>>>>>>>>>>> > >>>>>>>>>>>> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_ > >>> CONFIG > >>>>> , > >>>>>>> 20 * > >>>>>>>>>> 1024 * > >>>>>>>>>>>> 1024); > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder(); > >>>>>>>>>>>> > >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream = > >>>>>>>>> builder.stream( > >>>>>>>>>>>> > >>>>>>>>>>>> localDeltaTopic, > >>>>>>>>>>>> > >>>>>>>>>>>> Consumed.with( > >>>>>>>>>>>> > >>>>>>>>>>>> new Serdes.StringSerde(), > >>>>>>>>>>>> > >>>>>>>>>>>> new NodeMutationSerde<>() > >>>>>>>>>>>> > >>>>>>>>>>>> ) > >>>>>>>>>>>> > >>>>>>>>>>>> ); > >>>>>>>>>>>> > >>>>>>>>>>>> KStream<String, NodeState> localHistStream = > >>>>>>>>>> localDeltaStream.mapValues( > >>>>>>>>>>>> > >>>>>>>>>>>> (mutation) -> NodeState > >>>>>>>>>>>> > >>>>>>>>>>>> .newBuilder() > >>>>>>>>>>>> > >>>>>>>>>>>> .setMeta( > >>>>>>>>>>>> > >>>>>>>>>>>> mutation.getMetaMutation().getMeta() > >>>>>>>>>>>> > >>>>>>>>>>>> ) > >>>>>>>>>>>> > >>>>>>>>>>>> .setValue( > >>>>>>>>>>>> > >>>>>>>>>>>> mutation.getValueMutation(). > >>> getValue() > >>>>>>>>>>>> > >>>>>>>>>>>> ) > >>>>>>>>>>>> > >>>>>>>>>>>> .build() > >>>>>>>>>>>> > >>>>>>>>>>>> ); > >>>>>>>>>>>> > >>>>>>>>>>>> localHistStream.to( > >>>>>>>>>>>> > >>>>>>>>>>>> localHistTopic, > >>>>>>>>>>>> > >>>>>>>>>>>> Produced.with(new Serdes.StringSerde(), new > >>>>>>>>>> NodeStateSerde<>()) > >>>>>>>>>>>> > >>>>>>>>>>>> ); > >>>>>>>>>>>> > >>>>>>>>>>>> streams = new KafkaStreams(builder.build(), > >>>>>>> streamsConfiguration); > >>>>>>>>>>>> > >>>>>>>>>>>> streams.cleanUp(); > >>>>>>>>>>>> > >>>>>>>>>>>> streams.start(); > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > >> > >> > >> -- > >> -- Guozhang > >> > -- -- Guozhang