[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897795#comment-15897795 ]
ASF GitHub Bot commented on FLINK-4545: --------------------------------------- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3480 [FLINK-4545] use size-restricted LocalBufferPool instances for network communication Note: this PR is based on #3467 and PR 2 of 3 in a series to get rid of the network buffer parameter. With this PR, the number of buffers a `LocalBufferPool` has to offer, will be limited to `2 * <number of channels> + 8` for both input and output connections. This way, we reduce buffer bloat in our network stack without limiting ourselves to specific jobs and their connections too much since the total number of network buffers can now be arbitrarily large again without consequences on the delays checkpoint barriers, for example, have while travelling through all TMs. Eventually, this will lead to the network buffer parameter being removed (which was the initial goal) but in a simple scenario like the following, with a parallelism of 2 and thus running on 6 TMs, we were able to reduce the 75-percentile of checkpoint delays by 60% from 38ms to 16ms (median at 7 for both). ```java final StreamExecutionEnvironment env = getStreamExecutionEnvironment(params); env.disableOperatorChaining(); env.enableCheckpointing(1_000L); DataStreamSource<Tuple2<Long, Long>> source1 = env.addSource(new LongSource()); source1.slotSharingGroup("source") .keyBy(1) .map(new IdentityMapFunction<Tuple2<Long, Long>>()) .slotSharingGroup("map") .keyBy(1) .addSink(new DiscardingSink<Tuple2<Long, Long>>()) .slotSharingGroup("sink"); ``` By adding random delays (every 1000 keys 0-1ms) to the `IdentityMapFunction`, the median even improves from 5026ms to 293ms. Both scenarios do not influence the throughput of the program but for real programs, reductions in delay may differ since there actual state may need to be stored and other components take part as well ;) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4545 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3480 ---- commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-02-10T13:36:37Z [FLINK-4545] remove (unused) persistent partition type commit 11557c004450bcbbe680f1575f0e41d164424eae Author: Nico Kruber <n...@data-artisans.com> Date: 2017-02-10T15:11:08Z [docs] improve some documentation around network buffers commit cd999061d04ae803c79473241ac1f9b39c1f2731 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-02-10T15:12:19Z [hotfix][network] add some assertions documenting on which locks we rely commit 8f529bb3f42916c816c5091228569952917ad9b5 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-03-01T13:33:44Z [FLINK-4545] remove fixed-size BufferPool instances These were unused except for unit tests and will be replaced with bounded BufferPool instances. commit 91cea2917e9453f9de5c02472d99d4fc0d090dda Author: Nico Kruber <n...@data-artisans.com> Date: 2017-03-06T11:36:02Z [FLINK-4545] remove JobVertex#connectNewDataSetAsInput variant without partition type This removes JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) and requires the developer to call JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) instead and think about the partition type to add. commit 83d1404b106b558679e4c9ef16123fbc6b5eac72 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-03-06T11:37:56Z [FLINK-4545] remove unused IntermediateDataSet constructors These were implying a default result partition type which we want the developer to actively decide upon. commit e9d41b6b613a7bac5c489102977e16e4c6c4bb86 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-02-10T13:53:09Z [FLINK-4545] add a bounded result partition type This can be used to limit the number of network buffers used for this partition. (borrows the appropriate parts of a commit previously sketched for FLINK-5088 to implement bounded network queue lengths) commit b57f0652a768645a5712d376d0e4b438f35cfa6c Author: Nico Kruber <n...@data-artisans.com> Date: 2017-02-10T17:22:55Z [FLINK-4545] allow LocalBufferPool to use a limited number of buffers commit d1d8b18bba967c6fb8f3934aa4cf1cfc8a2c1106 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-02-20T16:12:54Z [FLINK-4545] also make the ResultPartitionType available at the InputGate This way, we know what kind of result partition is consumed by the input gate. commit d23fdf9d80dea5d46bfe2f7597f4d5e1295cae7b Author: Nico Kruber <n...@data-artisans.com> Date: 2017-02-13T18:24:45Z [FLINK-4545] try to set an upper bound on the LocalBufferPool if restricted Use "2 * <number of channels> + 8" from the following considerations: * 1 buffer for in-flight data in the subpartition/input channel * 1 buffer for parallel serialization * + some extra buffers Also re-introduce some tests for bounded buffer pools similar to the fixed-size buffer pool tests before. commit 37eed7bc59b6899e3d7bdd4b1a3dac87e5f04406 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-02-24T12:41:11Z [FLINK-4545] re-implement NetworkBufferPool#redistributeBuffers This version also takes the bounded network buffers into account. The distribution is not strictly uniform anymore though: * for every buffer pool, we determine the maximum number of buffers it can take from the available number - let's call this its 'capacity' * then, each of them will get roughly available * capacity / totalCapacity buffers on top of the required number of buffers ---- > Flink automatically manages TM network buffer > --------------------------------------------- > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network > Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)