Hello Nan,

Kafka does not tie up the processing thread to do disk flushing. However,
since you are on an older version of Kafka I suspect you're bumping into
some old issues that have been resolved in later versions. e.g.

https://issues.apache.org/jira/browse/KAFKA-4614

I'd suggest you upgrading to latest version (2.0.0) and try again to see if
you observe the same pattern.

Guozhang

On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni <
sbpothin...@gmail.com> wrote:

> I will wait for the expert’s opinion:
>
> Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s a
> Linux kernel parameter.
>
> -Sudhir
>
> > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> >
> > I think I found where the problem is, how to solve and why, still not
> sure.
> >
> > it related to disk (maybe flushing?). I did a single machine, single
> node,
> > single topic and single partition setup.  producer pub as 2000 message/s,
> > 10K size message size. and single key.
> >
> > when I save kafka log to the  memory based partition, I don't see a
> latency
> > over 100ms. top around 70ms.
> > when I save to a ssd hard drive. I do see latency spike, sometime over
> 1s.
> >
> > adjust the log.flush.inteval.message / log.flush.intefval.ms has impact,
> > but only to make thing worse... need suggestion.
> >
> > I think log flushing is totally async and done by OS in the default
> > setting. does kafka has to wait when flushing data to disk?
> >
> > Thanks,
> > Nan
> >
> >
> >
> >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> Given your application code:
> >>
> >> ----------------------------
> >>
> >> final KStream<String, NodeMutation> localDeltaStream = builder.stream(
> >>
> >>            localDeltaTopic,
> >>
> >>            Consumed.with(
> >>
> >>                new Serdes.StringSerde(),
> >>
> >>                new NodeMutationSerde<>()
> >>
> >>            )
> >>
> >>        );
> >>
> >>  KStream<String, NodeState> localHistStream =
> localDeltaStream.mapValues(
> >>
> >>            (mutation) -> NodeState
> >>
> >>                .newBuilder()
> >>
> >>                .setMeta(
> >>
> >>                    mutation.getMetaMutation().getMeta()
> >>
> >>                )
> >>
> >>                .setValue(
> >>
> >>                    mutation.getValueMutation().getValue()
> >>
> >>                )
> >>
> >>                .build()
> >>
> >>        );
> >>
> >>  localHistStream.to(
> >>
> >>            localHistTopic,
> >>
> >>            Produced.with(new Serdes.StringSerde(), new
> NodeStateSerde<>())
> >>
> >>        );
> >>
> >> ----------------------------
> >>
> >> which is pure stateless, committing will not touch on an state
> directory at
> >> all. Hence committing only involves committing offsets to Kafka.
> >>
> >>
> >> Guozhang
> >>
> >>
> >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> >>>
> >>> I was suspecting that too, but I also noticed the spike is not spaced
> >>> around 10s. to further prove it. I put kafka data directory in a memory
> >>> based directory.  it still has such latency spikes.  I am going to test
> >> it
> >>> on a single broker, single partition env.  will report back soon.
> >>>
> >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>
> >>>> Hello Nan,
> >>>>
> >>>> Thanks for the detailed information you shared. When Kafka Streams is
> >>>> normally running, no rebalances should be triggered unless some of the
> >>>> instances (in your case, docker containers) have soft failures.
> >>>>
> >>>> I suspect the latency spike is due to the commit intervals: streams
> >> will
> >>>> try to commit its offset at a regular paces, which may increase
> >> latency.
> >>> It
> >>>> is controlled by the "commit.interval.ms" config value. I saw that in
> >>> your
> >>>> original email you've set it to 10 * 1000 (10 seconds). Is that
> aligned
> >>>> with the frequency you observe latency spikes?
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com>
> wrote:
> >>>>>
> >>>>> did more test and and make the test case simple.
> >>>>> all the setup now is a single physical machine. running 3 docker
> >>>> instance.
> >>>>> a1, a2, a3
> >>>>>
> >>>>> kafka + zookeeper running on all of those docker containers.
> >>>>> producer running on a1, send a single key,  update speed 2000
> >>> message/s,
> >>>>> each message is 10K size.
> >>>>> 3 consumer(different group)  are running. one on each docker.
> >>>>> all topics are pre-created.
> >>>>> in startup, I do see some latency greater than 100ms, which is fine.
> >>> and
> >>>>> then everything is good. latency is low and consumer don't see
> >> anything
> >>>>> over 100ms for a while.
> >>>>> then I see a few messages have latency over 100ms. then back to
> >> normal,
> >>>>> then happen again..... do seems like gc problem. but I check the gc
> >>>> log.  I
> >>>>> don't think it can cause over 100ms. (both are G1 collector)
> >>>>>
> >>>>> after the stream stable running( exclude the startup), the first
> >>> message
> >>>>> over 100ms take 179ms  and the gc ( it has a 30ms pause, but should
> >> not
> >>>>> cause a 179ms end to end).
> >>>>>
> >>>>> FROM APP
> >>>>>
> >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
> >>>>> 3184739K->84018K(5947904K), 0.0093730 secs]
> >>>>>
> >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
> >>>>> 3184690K->84280K(6053888K), 0.0087473 secs]
> >>>>>
> >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
> >>>>> 3301176K->84342K(6061056K), 0.0127339 secs]
> >>>>>
> >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
> >>>>> 3301238K->84624K(6143488K), 0.0140844 secs]
> >>>>>
> >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
> >>>>> 3386000K->89949K(6144000K), 0.0108118 secs]
> >>>>>
> >>>>>
> >>>>>
> >>>>> kafka a1
> >>>>>
> >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation
> >> Pause)
> >>>>> (young), 0.0214200 secs]
> >>>>>
> >>>>>   [Parallel Time: 17.2 ms, GC Workers: 8]
> >>>>>
> >>>>>      [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
> >>>>> 7982673.8, Diff: 16.3]
> >>>>>
> >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff:
> >> 1.5,
> >>>>> Sum: 1.5]
> >>>>>
> >>>>>      [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum:
> >>> 8.4]
> >>>>>
> >>>>>         [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13,
> >> Sum:
> >>>> 37]
> >>>>>
> >>>>>      [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum:
> >> 7.1]
> >>>>>
> >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> >>> 0.0,
> >>>>> Sum: 0.0]
> >>>>>
> >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5,
> >> Sum:
> >>>>> 36.5]
> >>>>>
> >>>>>      [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9,
> >> Sum:
> >>>> 2.9]
> >>>>>
> >>>>>         [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24,
> >>>> Sum:
> >>>>> 83]
> >>>>>
> >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> >>>> Sum:
> >>>>> 0.1]
> >>>>>
> >>>>>      [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff:
> >> 16.2,
> >>>>> Sum: 56.5]
> >>>>>
> >>>>>      [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max:
> >>>> 7982674.5,
> >>>>> Diff: 0.6]
> >>>>>
> >>>>>   [Code Root Fixup: 0.0 ms]
> >>>>>
> >>>>>   [Code Root Purge: 0.0 ms]
> >>>>>
> >>>>>   [Clear CT: 1.0 ms]
> >>>>>
> >>>>>   [Other: 3.2 ms]
> >>>>>
> >>>>>      [Choose CSet: 0.0 ms]
> >>>>>
> >>>>>      [Ref Proc: 1.9 ms]
> >>>>>
> >>>>>      [Ref Enq: 0.0 ms]
> >>>>>
> >>>>>      [Redirty Cards: 0.8 ms]
> >>>>>
> >>>>>      [Humongous Register: 0.1 ms]
> >>>>>
> >>>>>      [Humongous Reclaim: 0.0 ms]
> >>>>>
> >>>>>      [Free CSet: 0.2 ms]
> >>>>>
> >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
> >>>>> 265.5M(1024.0M)->217.9M(1024.0M)]
> >>>>>
> >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
> >>>>>
> >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation
> >> Pause)
> >>>>> (young), 0.0310004 secs]
> >>>>>
> >>>>>   [Parallel Time: 24.4 ms, GC Workers: 8]
> >>>>>
> >>>>>      [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max:
> >>>>> 7984444.7, Diff: 18.6]
> >>>>>
> >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff:
> >> 1.9,
> >>>>> Sum: 2.0]
> >>>>>
> >>>>>      [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8,
> >> Sum:
> >>>>> 32.9]
> >>>>>
> >>>>>         [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25,
> >> Sum:
> >>>> 43]
> >>>>>
> >>>>>      [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum:
> >>>> 25.5]
> >>>>>
> >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> >>> 0.0,
> >>>>> Sum: 0.0]
> >>>>>
> >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9,
> >> Sum:
> >>>>> 32.7]
> >>>>>
> >>>>>      [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6,
> >> Sum:
> >>>> 6.8]
> >>>>>
> >>>>>         [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10,
> >>> Sum:
> >>>>> 43]
> >>>>>
> >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> >>>> Sum:
> >>>>> 0.1]
> >>>>>
> >>>>>      [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff:
> >>> 19.1,
> >>>>> Sum: 100.1]
> >>>>>
> >>>>>      [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max:
> >>>> 7984449.9,
> >>>>> Diff: 0.8]
> >>>>>
> >>>>>   [Code Root Fixup: 0.0 ms]
> >>>>>
> >>>>>   [Code Root Purge: 0.0 ms]
> >>>>>
> >>>>>   [Clear CT: 1.1 ms]
> >>>>>
> >>>>>   [Other: 5.5 ms]
> >>>>>
> >>>>>      [Choose CSet: 0.0 ms]
> >>>>>
> >>>>>      [Ref Proc: 2.2 ms]
> >>>>>
> >>>>>      [Ref Enq: 0.0 ms]
> >>>>>
> >>>>>      [Redirty Cards: 2.8 ms]
> >>>>>
> >>>>>      [Humongous Register: 0.1 ms]
> >>>>>
> >>>>>      [Humongous Reclaim: 0.0 ms]
> >>>>>
> >>>>>      [Free CSet: 0.1 ms]
> >>>>>
> >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
> >>>>> 265.9M(1024.0M)->218.4M(1024.0M)]
> >>>>>
> >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
> >>>>>
> >>>>>
> >>>>> so when kafka stream running, is there any trying to rebalance?
> >> either
> >>>>> broker rebalance or client rebalance?
> >>>>> any kind of test to see what cause the trouble?
> >>>>>
> >>>>> Thanks,
> >>>>> Nan
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Okay, so you're measuring end-to-end time from producer -> broker
> >> ->
> >>>>>> streams' consumer client, there are multiple phases that can
> >>> contribute
> >>>>> to
> >>>>>> the 100ms latency, and I cannot tell if stream's consumer phase is
> >>> the
> >>>>>> major contributor. For example, if the topic was not created
> >> before,
> >>>> then
> >>>>>> when the broker first received a produce request it may need to
> >>> create
> >>>>> the
> >>>>>> topic, which involves multiple steps including writes to ZK which
> >>> could
> >>>>>> take time.
> >>>>>>
> >>>>>> There are some confusions from your description: you mentioned
> >> "Kafka
> >>>>>> cluster is already up and running", but I think you are referring
> >> to
> >>>>> "Kafka
> >>>>>> Streams application instances are already up and running", right?
> >>> Since
> >>>>>> only the latter has rebalance process, while the Kafak brokers do
> >> not
> >>>>>> really have "rebalances" except balancing load by migrating
> >>> partitions.
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> right, so my kafka cluster is already up and running for a while,
> >>>> and I
> >>>>>> can
> >>>>>>> see from the log all broker instance already change from
> >> rebalance
> >>> to
> >>>>>>> running.
> >>>>>>>
> >>>>>>> I did a another test.
> >>>>>>> from producer, right before the message get send to the broker, I
> >>>> put a
> >>>>>>> timestamp in the message. and from the consumer side which is
> >> after
> >>>>>> stream
> >>>>>>> processing, I compare this timestamp with current time. I can see
> >>>> some
> >>>>>>> message processing time is above 100ms on some real powerful
> >>>> hardware.
> >>>>>> and
> >>>>>>> from my application gc, all the gc time is below 1ms, kafka gc
> >> only
> >>>>>> happen
> >>>>>>> once and below 1ms too.
> >>>>>>>
> >>>>>>> very puzzled. is there any communication to zookeeper, if not get
> >>>>>> response,
> >>>>>>> will cause the broker to pause? I don't think that's the case but
> >>> at
> >>>>> this
> >>>>>>> time don't know what else can be suspected.
> >>>>>>>
> >>>>>>> Nan
> >>>>>>>
> >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <
> >> wangg...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello Nan,
> >>>>>>>>
> >>>>>>>> Note that Streams may need some time to rebalance and assign
> >>> tasks
> >>>>> even
> >>>>>>> if
> >>>>>>>> you only starts with one instance.
> >>>>>>>>
> >>>>>>>> I'd suggest you register your state listener in Kafka Streams
> >> via
> >>>>>>>> KafkaStreams#setStateListener, and your customized
> >> StateListener
> >>>>> should
> >>>>>>>> record when the state transits from REBALANCING to RUNNING
> >> since
> >>>> only
> >>>>>>> after
> >>>>>>>> that the streams client will start to process the first record.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>>> thanks, which JMX properties indicate  "processing latency
> >>>>> spikes"  /
> >>>>>>>>> "throughput"
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
> >>>>>> matth...@confluent.io
> >>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> I cannot spot any obvious reasons.
> >>>>>>>>>>
> >>>>>>>>>> As you consume from the result topic for verification, we
> >>>> should
> >>>>>>> verify
> >>>>>>>>>> that the latency spikes original on write and not on read:
> >>> you
> >>>>>> might
> >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if
> >>>>>>> processing
> >>>>>>>>>> latency spikes or throughput drops.
> >>>>>>>>>>
> >>>>>>>>>> Also watch for GC pauses in the JVM.
> >>>>>>>>>>
> >>>>>>>>>> Hope this helps.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote:
> >>>>>>>>>>> btw, I am using version 0.10.2.0
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <
> >>> nanxu1...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I am working on a kafka stream app, and see huge latency
> >>>>>> variance,
> >>>>>>>>>>>> wondering what can cause this?
> >>>>>>>>>>>>
> >>>>>>>>>>>> the processing is very simple and don't have state,
> >>>> linger.ms
> >>>>>>>> already
> >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and
> >>>>> published
> >>>>>>> as
> >>>>>>>>> 2000
> >>>>>>>>>>>> messages/s, network is 10G.  using a regular consumer
> >>> watch
> >>>>> the
> >>>>>>>>>>>> localHistTopic  topic and just every 2000 message print
> >>> out
> >>>> a
> >>>>>>>> counter,
> >>>>>>>>>> it
> >>>>>>>>>>>> usually every second I get a count 2000 as the publish
> >>>> speed,
> >>>>>> but
> >>>>>>>>>> sometime
> >>>>>>>>>>>> I see it stall for 3 or more seconds and then print out
> >> a
> >>>> few
> >>>>>>> count.
> >>>>>>>>>> like
> >>>>>>>>>>>> cpu is paused during that time or message being
> >>> cache/batch
> >>>>> then
> >>>>>>>>>> processed.
> >>>>>>>>>>>> any suggestion?
> >>>>>>>>>>>>
> >>>>>>>>>>>>  final Properties streamsConfiguration = new
> >>> Properties();
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>>>>>>>>> applicationId);
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_
> >>>>> CONFIG,
> >>>>>>>>>> clientId);
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> >>>>>>>>> SERVERS_CONFIG,
> >>>>>>>>>>>> bootstrapServers);
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> >>>>>>>>> SERDE_CLASS_CONFIG,
> >>>>>>>>>>>> Serdes.String()
> >>>>>>>>>>>>
> >>>>>>>>>>>>            .getClass().getName());
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> >>>>> MS_CONFIG,
> >>>>>>>>>>>> 10 * 1000);
> >>>>>>>>>>>>
> >>>>>>>>>>>> //
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> >>>>>>>>> 0);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.PRODUCER_PREFIX
> >>>>>>>>>>>>
> >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.PRODUCER_PREFIX
> >>>>>>>>>>>>
> >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
> >>>>> 335544320);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.PRODUCER_PREFIX
> >>>>>>>>>>>>
> >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
> >>>>>>> REQUESTS_PER_CONNECTION,
> >>>>>>>>> 30);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.PRODUCER_PREFIX
> >>>>>>>>>>>>
> >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.consumerPrefix(
> >>>>>>>>>>>>            ConsumerConfig.MAX_PARTITION_
> >>>>> FETCH_BYTES_CONFIG),20
> >>>>>> *
> >>>>>>>>> 1024 *
> >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000);
> >>>>>>>>>>>>
> >>>>>>>>>>>> //
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> >>>>>>>>> 0);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.PRODUCER_PREFIX
> >>>>>>>>>>>>
> >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.PRODUCER_PREFIX
> >>>>>>>>>>>>
> >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
> >>>>> 335544320);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.PRODUCER_PREFIX
> >>>>>>>>>>>>
> >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
> >>>>>>> REQUESTS_PER_CONNECTION,
> >>>>>>>>> 30);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.PRODUCER_PREFIX
> >>>>>>>>>>>>
> >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streamsConfiguration.put(
> >>>>> StreamsConfig.consumerPrefix(
> >>>>>>>>>>>>
> >>>>>>>>>>>>            ConsumerConfig. MAX_PARTITION_FETCH_BYTES_
> >>> CONFIG
> >>>>> ,
> >>>>>>> 20 *
> >>>>>>>>>> 1024 *
> >>>>>>>>>>>> 1024);
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder();
> >>>>>>>>>>>>
> >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream =
> >>>>>>>>> builder.stream(
> >>>>>>>>>>>>
> >>>>>>>>>>>>            localDeltaTopic,
> >>>>>>>>>>>>
> >>>>>>>>>>>>            Consumed.with(
> >>>>>>>>>>>>
> >>>>>>>>>>>>                new Serdes.StringSerde(),
> >>>>>>>>>>>>
> >>>>>>>>>>>>                new NodeMutationSerde<>()
> >>>>>>>>>>>>
> >>>>>>>>>>>>            )
> >>>>>>>>>>>>
> >>>>>>>>>>>>        );
> >>>>>>>>>>>>
> >>>>>>>>>>>>  KStream<String, NodeState> localHistStream =
> >>>>>>>>>> localDeltaStream.mapValues(
> >>>>>>>>>>>>
> >>>>>>>>>>>>            (mutation) -> NodeState
> >>>>>>>>>>>>
> >>>>>>>>>>>>                .newBuilder()
> >>>>>>>>>>>>
> >>>>>>>>>>>>                .setMeta(
> >>>>>>>>>>>>
> >>>>>>>>>>>>                    mutation.getMetaMutation().getMeta()
> >>>>>>>>>>>>
> >>>>>>>>>>>>                )
> >>>>>>>>>>>>
> >>>>>>>>>>>>                .setValue(
> >>>>>>>>>>>>
> >>>>>>>>>>>>                    mutation.getValueMutation().
> >>> getValue()
> >>>>>>>>>>>>
> >>>>>>>>>>>>                )
> >>>>>>>>>>>>
> >>>>>>>>>>>>                .build()
> >>>>>>>>>>>>
> >>>>>>>>>>>>        );
> >>>>>>>>>>>>
> >>>>>>>>>>>>  localHistStream.to(
> >>>>>>>>>>>>
> >>>>>>>>>>>>            localHistTopic,
> >>>>>>>>>>>>
> >>>>>>>>>>>>            Produced.with(new Serdes.StringSerde(), new
> >>>>>>>>>> NodeStateSerde<>())
> >>>>>>>>>>>>
> >>>>>>>>>>>>        );
> >>>>>>>>>>>>
> >>>>>>>>>>>> streams = new KafkaStreams(builder.build(),
> >>>>>>> streamsConfiguration);
> >>>>>>>>>>>>
> >>>>>>>>>>>>        streams.cleanUp();
> >>>>>>>>>>>>
> >>>>>>>>>>>> streams.start();
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>



-- 
-- Guozhang

Reply via email to