btw, I am using version 0.10.2.0 On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com> wrote:
> I am working on a kafka stream app, and see huge latency variance, > wondering what can cause this? > > the processing is very simple and don't have state, linger.ms already > change to 5ms. the message size is around 10K byes and published as 2000 > messages/s, network is 10G. using a regular consumer watch the > localHistTopic topic and just every 2000 message print out a counter, it > usually every second I get a count 2000 as the publish speed, but sometime > I see it stall for 3 or more seconds and then print out a few count. like > cpu is paused during that time or message being cache/batch then processed. > any suggestion? > > final Properties streamsConfiguration = new Properties(); > > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > applicationId); > > streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId); > > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstrapServers); > > > streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String() > > .getClass().getName()); > > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 10 * 1000); > > // > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > + ProducerConfig.BATCH_SIZE_CONFIG,163840); > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320); > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30); > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > + ProducerConfig.LINGER_MS_CONFIG,"5"); > > streamsConfiguration.put(StreamsConfig.consumerPrefix( > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 * > 1024);MS_CONFIG, 10 * 1000); > > // > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > + ProducerConfig.BATCH_SIZE_CONFIG,163840); > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320); > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30); > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > + ProducerConfig.LINGER_MS_CONFIG,"5"); > > streamsConfiguration.put(StreamsConfig.consumerPrefix( > > ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 * 1024 * > 1024); > > > final StreamsBuilder builder = new StreamsBuilder(); > > final KStream<String, NodeMutation> localDeltaStream = builder.stream( > > localDeltaTopic, > > Consumed.with( > > new Serdes.StringSerde(), > > new NodeMutationSerde<>() > > ) > > ); > > KStream<String, NodeState> localHistStream = localDeltaStream.mapValues( > > (mutation) -> NodeState > > .newBuilder() > > .setMeta( > > mutation.getMetaMutation().getMeta() > > ) > > .setValue( > > mutation.getValueMutation().getValue() > > ) > > .build() > > ); > > localHistStream.to( > > localHistTopic, > > Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>()) > > ); > > streams = new KafkaStreams(builder.build(), streamsConfiguration); > > streams.cleanUp(); > > streams.start(); > > > > >