sunqing created KAFKA-8497: ------------------------------ Summary: kafka streams application占用内存很高 Key: KAFKA-8497 URL: https://issues.apache.org/jira/browse/KAFKA-8497 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: sunqing
一个简单的kafka streams测试应用,使用KStream来消费数据,当所消费的kafka Topic中的数据暴涨时,或者要消费的Topic中待消费数据量很大时,消费程序占用的内存会非常高,能达到20多G, 疑问:kafka streams不是逐条消费吗,为啥topic中的数据量很大时会导致程序内存飙升 测试程序代码如下: 代码如下: public class RunMain { public static StreamsBuilder builder = new StreamsBuilder(); public static void kafkaStreamStart() { KStream<String, String> stream = builder.stream(Arrays.asList("wk_wangxin_po")); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testwang_xin"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "zktj-kafka-broker-out-1:29092,zktj-kafka-broker-out-2:29092,zktj-kafka-broker-out-3:29092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "PLAIN"); props.setProperty("sasl.kerberos.service.name", "kafka"); System.setProperty("java.security.auth.login.config", "./conf/kafka_client_jaas.conf"); stream.foreach(new ForeachAction<String, String>() { @Override public void apply(String key, String value) { System.out.println("===="); System.out.println(key); } }); Topology topo = builder.build(); KafkaStreams streams = new KafkaStreams(topo, props); streams.start(); } public static void main(String[] args) { kafkaStreamStart(); } } -- This message was sent by Atlassian JIRA (v7.6.3#76005)