Dmitry Golubets created FLINK-5652: -------------------------------------- Summary: Memory leak in AsyncDataStream Key: FLINK-5652 URL: https://issues.apache.org/jira/browse/FLINK-5652 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.3.0 Reporter: Dmitry Golubets
When async operation timeout is > 0, the number of StreamRecordQueueEntry instances keeps growing. It can be easily reproduced with the following code: {code:scala} val src: DataStream[Int] = env.fromCollection((1 to Int.MaxValue).iterator) val asyncFunction = new AsyncFunction[Int, Int] with Serializable { override def asyncInvoke(input: Int, collector: AsyncCollector[Int]): Unit = { collector.collect(List(input)) } } AsyncDataStream.unorderedWait(src, asyncFunction, 0, TimeUnit.MINUTES, 1).print() {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)