btw, I am using version 0.10.2.0

On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com> wrote:

> I am working on a kafka stream app, and see huge latency variance,
> wondering what can cause this?
>
> the processing is very simple and don't have state, linger.ms already
> change to 5ms. the message size is around 10K byes and published as 2000
> messages/s, network is 10G.  using a regular consumer watch the
> localHistTopic  topic and just every 2000 message print out a counter,  it
> usually every second I get a count 2000 as the publish speed, but sometime
> I see it stall for 3 or more seconds and then print out a few count. like
> cpu is paused during that time or message being cache/batch then processed.
> any suggestion?
>
>   final Properties streamsConfiguration = new Properties();
>
>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> applicationId);
>
>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
>
>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapServers);
>
>
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String()
>
>             .getClass().getName());
>
>         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 10 * 1000);
>
> //
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
>
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
>
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
>             + ProducerConfig.LINGER_MS_CONFIG,"5");
>
>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
>             ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 *
> 1024);MS_CONFIG, 10 * 1000);
>
> //
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
>
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
>
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>
>             + ProducerConfig.LINGER_MS_CONFIG,"5");
>
>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
>
>             ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 * 1024 *
> 1024);
>
>
>  final StreamsBuilder builder = new StreamsBuilder();
>
>  final KStream<String, NodeMutation> localDeltaStream = builder.stream(
>
>             localDeltaTopic,
>
>             Consumed.with(
>
>                 new Serdes.StringSerde(),
>
>                 new NodeMutationSerde<>()
>
>             )
>
>         );
>
>   KStream<String, NodeState> localHistStream = localDeltaStream.mapValues(
>
>             (mutation) -> NodeState
>
>                 .newBuilder()
>
>                 .setMeta(
>
>                     mutation.getMetaMutation().getMeta()
>
>                 )
>
>                 .setValue(
>
>                     mutation.getValueMutation().getValue()
>
>                 )
>
>                 .build()
>
>         );
>
>   localHistStream.to(
>
>             localHistTopic,
>
>             Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>())
>
>         );
>
>  streams = new KafkaStreams(builder.build(), streamsConfiguration);
>
>         streams.cleanUp();
>
> streams.start();
>
>
>
>
>

Reply via email to