Aljoscha Krettek created FLINK-6413: ---------------------------------------
Summary: Add stream operator callback to notify about consumed network buffer Key: FLINK-6413 URL: https://issues.apache.org/jira/browse/FLINK-6413 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Aljoscha Krettek This is originally motivated by BEAM-1612. Beam has the notion of bundles and allows users to do work at the start/end of each bundle. This could be used for setting up some expensive connection or for batching accesses to some external system. There is also internal optimisation potential because accesses/updates to state could be kept in-memory per bundle/buffer and only afterwards be written to fault-tolerant state. The bundling induced by the Flink network stack (which depends on the network buffer size and the buffer timeout) seems like a natural fit for this. I propose to add an _experimental_ interface {{BufferConsumedListener}} (or some such name): {code} interface BufferConsumedListener { void notifyBufferConsumed(): } {code} that is invoked in the input processor whenever a network buffer is exhausted: https://github.com/apache/flink/blob/922352ac35f3753334e834632e3e361fbd36336e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L178-L178 The change is very simple, three lines of code would be added: {code} if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycle(); currentRecordDeserializer = null; if (streamOperator instanceof BufferConsumedListener) { ((BufferConsumedListener) streamOperator).notifyBufferConsumed(): } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)