Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4485 Reviewed 9 of 13 files at r2. Review status: all files reviewed at latest revision, 7 unresolved discussions, some commit checks failed. --- *[a discussion](https://reviewable.io:443/reviews/apache/flink/4485#-KsOs0jqeqTsAUTwWuFa:-KsOs0jqeqTsAUTwWuFb:b-kg45p7) (no related file):* Depending on how you build on this in the other PRs, what do you think about using a fixed-size `LocalBufferPool` (or a customized sub-class) per `RemoteInputChannel` instead? This would solve potential issues with recycling and would also be a lot less code. Additionally, you will gain the buffer availability listener feature so that you will be notified when the buffer is released (which may be deep inside other code with no access to the `RemoteInputChannel` anymore. FYI: This change of commits in the PR actually would qualify for a separate PR --- *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java, line 216 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOlN5wNvmcco4z2tGj:-KsOlN5xcf-lW0z1FpJo:b-ppkkjd) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java#L216)):* > ```Java > if (gate.getConsumedPartitionType().isCreditBased()) { > // Create a fix size buffer pool for floating buffers and assign exclusive buffers to input channels directly > bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); > ``` we still need to call `gate.setBufferPool(bufferPool)` in order for the gate to be aware (this call is common to both paths of the `if`) --- *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java, line 164 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOrPb1_aKuUAa_uyC6:-KsOrPb1_aKuUAa_uyC7:b3045fp) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java#L164)):* > ```Java > } > > redistributeBuffers(); > ``` now here, you may need to add the try-catch releasing any already added segments back (see my comments in `SingleInputGate` --- *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java, line 38 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOhXwT2FF306uRbf5l:-KsOhXwU2y_hAFTh2tTM:b-mb3jxr) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L38)):* > ```Java > * no checkpoint barriers. > */ > PIPELINED_BOUNDED(true, true, true, false); > ``` Does it make sense, to already add an `PIPELINE_CREDIT_BASED(true, true, true, true)`? I guess, credit-based can be considered bounded as well --- *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java, line 82 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOi2-hM5IFzcjrGzin:-KsOi2-iXW_-MtjTxono:b-3woyzq) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L82)):* > ```Java > return isBounded; > } > ``` please add a (simple) javadoc similar to the `isBounded()`method --- *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java, line 315 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOoBbKr_kVVDxNIZhm:-KsOoBbKr_kVVDxNIZhn:b85dio4) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L315)):* > ```Java > return segments; > } catch (Throwable t) { > if (segments != null && segments.size() > 0) { > ``` Unfortunately, the cleanup will not work as documented - if `networkBufferPool.requestMemorySegments(networkBuffersPerChannel);` throws an exception, `segments` will be `null`. In order to handle all cases, e.g. successfully requested some and afterwards an exception was thrown, you need to handle this inside `NetworkBufferPool#requestMemorySegments()`. I guess, after changing this, this method will not be required anymore --- *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java, line 319 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOowKk76qDgaVZ6KQK:-KsOowKk76qDgaVZ6KQL:b-n13dga) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L319)):* > ```Java > } > > if (t instanceof IOException) { > ``` please use `ExceptionUtils#rethrowIOException` for this pattern --- *Comments from [Reviewable](https://reviewable.io:443/reviews/apache/flink/4485)* <!-- Sent from Reviewable.io -->
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---