Aljoscha Krettek created FLINK-6413:
---------------------------------------

             Summary: Add stream operator callback to notify about consumed 
network buffer 
                 Key: FLINK-6413
                 URL: https://issues.apache.org/jira/browse/FLINK-6413
             Project: Flink
          Issue Type: Improvement
          Components: DataStream API
            Reporter: Aljoscha Krettek


This is originally motivated by BEAM-1612. Beam has the notion of bundles and 
allows users to do work at the start/end of each bundle. This could be used for 
setting up some expensive connection or for batching accesses to some external 
system. There is also internal optimisation potential because accesses/updates 
to state could be kept in-memory per bundle/buffer and only afterwards be 
written to fault-tolerant state.

The bundling induced by the Flink network stack (which depends on the network 
buffer size and the buffer timeout) seems like a natural fit for this. I 
propose to add an _experimental_ interface {{BufferConsumedListener}} (or some 
such name):

{code}
interface BufferConsumedListener {
  void notifyBufferConsumed():
}
{code}

that is invoked in the input processor whenever a network buffer is exhausted: 
https://github.com/apache/flink/blob/922352ac35f3753334e834632e3e361fbd36336e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L178-L178

The change is very simple, three lines of code would be added:

{code}
if (result.isBufferConsumed()) {
  currentRecordDeserializer.getCurrentBuffer().recycle();
  currentRecordDeserializer = null;
  if (streamOperator instanceof BufferConsumedListener) {
    ((BufferConsumedListener) streamOperator).notifyBufferConsumed():
  }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to