[ https://issues.apache.org/jira/browse/KAFKA-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna resolved KAFKA-8497. ---------------------------------- Resolution: Invalid > kafka streams application takes up a lot of memory > -------------------------------------------------- > > Key: KAFKA-8497 > URL: https://issues.apache.org/jira/browse/KAFKA-8497 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: sunqing > Priority: Major > > > A simple kafka streams application, use KStream to consume data, as below。 > Memory usage is very high when there is a large amount of data under the > consuming topic. > Sometimes it goes up to 20G. > This is very strange. The program doesn't do anything. It just reads the data > and prints it to the screen. Why is the memory usage so high when there is a > lot of data in the topic? > > > > The program code: > > public class TestMain { > public static StreamsBuilder builder = new StreamsBuilder(); > public static void kafkaStreamStart() { > KStream<String, String> stream = > builder.stream(Arrays.asList("wk_wangxin_po")); > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testwang_xin"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > "zktj-kafka-broker-out-1:29092,zktj-kafka-broker-out-2:29092,zktj-kafka-broker-out-3:29092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); > props.setProperty("security.protocol", "SASL_PLAINTEXT"); > props.setProperty("sasl.mechanism", "PLAIN"); > props.setProperty("sasl.kerberos.service.name", "kafka"); > System.setProperty("java.security.auth.login.config", > "./conf/kafka_client_jaas.conf"); > stream.foreach(new ForeachAction<String, String>() { > @Override > public void apply(String key, String value) > { System.out.println("===="); System.out.println(key); } > }); > Topology topo = builder.build(); > KafkaStreams streams = new KafkaStreams(topo, props); > streams.start(); > } > public static void main(String[] args) > { kafkaStreamStart(); } > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)