Hello, Whenever I submit a job to Flink that retrieves data from Kafka the memory consumption continuously increases. I've changed the max heap memory from 2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the limit.
An example of a simple Job that shows this behavior is depicted bellow. / /* * Execution Environment Setup */ final StreamExecutionEnvironment environment = getGlobalJobConfiguration(configDir, configurations); /** * Collect event data from Kafka */ DataStreamSource<String> s = environment.addSource(new FlinkKafkaConsumer010<String>( configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC), new SimpleStringSchema(), getKafkaConfiguration(configurations))); s.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return false; } }).print(); private static Properties getKafkaConfiguration(ParameterTool configurations) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", configurations.get(ConfigKeys.KAFKA_HOSTS)); properties.put("group.id", "flink-consumer-"+UUID.randomUUID().toString()); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("security.protocol", configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL)); properties.put("ssl.truststore.location", configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION)); properties.put("ssl.truststore.password", configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD)); properties.put("ssl.keystore.location", configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION)); properties.put("ssl.keystore.password", configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD)); return properties; } / Moreover, when I stop the job, the task manager does not terminate the kafka connection and the memory is kept allocated. To stop this, I have to kill the task manager process. *My Flink version: 1.2.1 Kafka consumer: 010 Kafka version: 2_11_0.10.1.0-2* I've activated the /taskmanager.debug.memory.startLogThread/ property to output for every 5 seconds and attached the log with the results. The output of free -m before submitting the job: / total used free shared buff/cache available Mem: 15817 245 14755 24 816 15121 Swap: 0 0 0/ after having the job running for about 5 min: free -m / total used free shared buff/cache available Mem: 15817 9819 5150 24 847 5547 Swap: 0 0 0 / taskmanager.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14342/taskmanager.log> ----- Best Regards, Pedro Chaves -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.