GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4613
[FLINK-7520][network] let our Buffer class extend from Netty's buffer class ## What is the purpose of the change With this PR, we extend out own `Buffer` class to extend from Netty's `ByteBuf` class so that we can avoid one buffer copy while transferring data through Netty but keep our `MemorySegment` logic, performance, and configuration. Note that this PR is based on several previous smaller PRs which are all needed: #4445, #4447, #4506, #4481, #4517, #4518, #4528, #4581, #4590, #4591, #4592, #4593, and #4594. ## Brief change log - extract the `Buffer` interface (common functions used by non-Netty code inside Flink) and a `NetworkBuffer` implementation (extending from `ByteBuf`, implementing `Buffer`) - change `Buffer` interface to follow the (separated) reader and writer index logic that Netty has and replace the `#getSize()`, ' #setSize()` logic, i.e. ``` +-------------------+----------------+----------------+ | discardable bytes | readable bytes | writable bytes | +-------------------+----------------+----------------+ | | | | 0 <= readerIndex <= writerIndex <= size ``` (currently, only the writer index is used and both reading and writing in Flink code is performed exclusively either on NIO buffer wrappers or the underlying `MemorySegment`s directly) - add `getNioBuffer()` and `getNioBufferReadable()` for properly accessing underlying buffer regions - since we inherit from `AbstractByteBuf`, only one thread should work with the `Buffer`'s (meta)data as modifications to the indices are not thread-safe - this is the usual case though - add `NetworkBuffer#setAllocator()` which is necessary to set before giving a `NetworkBuffer` into Netty code (we do not rely on this allocator in our code!) ## Verifying this change This change added tests and can be verified as follows: - extended `BufferTest` by inheriting from Netty's `AbstractByteBufTest` (copied into our sources due to it not being available in a separate test jar) to verify our buffer implementation follows Netty's invariants - existing (integration) tests such as `NettyMessageSerializationTest` and `PartitionRequestClientHandlerTest` for the changes in the use of the new APIs - any other (integration) test that uses the network stack ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4613.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4613 ---- commit 2ae08d79712235a965db45ee739076cd6a3601fa Author: Nico Kruber <n...@data-artisans.com> Date: 2017-07-31T10:06:14Z [hotfix] fix some typos commit cda26a0d8e6d07c48ac03ee4aab74c8699a04428 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-02T09:35:16Z [hotfix][tests] add missing test descriptions commit 3b921d60c1ff969874363c75916a1d40fcc99847 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-02T09:34:54Z [FLINK-7310][core] always use the HybridMemorySegment Since we'd like to use our own off-heap buffers for network communication, we cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment. We thus drop any code that loads the HeapMemorySegment (it is still available if needed) in favour of the HybridMemorySegment which is able to work on both heap and off-heap memory. For the performance penalty of this change compared to using HeapMemorySegment alone, see this interesting blob article (from 2015): https://flink.apache.org/news/2015/09/16/off-heap-memory.html commit 3bdd01454dae9eafbd220a5d5554d402e12b8d9f Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-02T09:27:49Z [hotfix][core] add additional final methods in final classes This applies the scheme of HeapMemorySegment to HybridMemorySegment where core methods are also marked "final" to be more future-proof. commit 1f33ec0df5b83135256538132b0de58c3bd86402 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-04T13:15:32Z [FLINK-7312][checkstyle] remove trailing whitespace commit 679793f478a3f79c61dec9d5c424c748e2a5d6ed Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-04T13:20:28Z [FLINK-7312][checkstyle] organise imports commit 6fe487a2e929fe3aaf1d6a1d5ef3070d6263caad Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-04T13:24:16Z [FLINK-7312][checkstyle] add, adapt and improve comments commit d4b77dc006f833b08ebf5e6324cfc53ca754c254 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-04T13:26:40Z [FLINK-7312][checkstyle] remove redundant "public" keyword in interfaces commit 2ce3703c41161a00c7e749f45f11f654e3183e52 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-04T13:27:36Z [FLINK-7312][checkstyle] ignore some spurious warnings commit 987f8a41c034b39d14b5c00d6ecc91ef3c157c62 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-04T13:35:15Z [FLINK-7312][checkstyle] enable checkstyle for `flink/core/memory/*` We deliberately ignore redundant modifiers for now since we want `final` modifiers on `final` classes for increased future-proofness. commit 6ce7b17f6c645a1a1ec136a307ce83f02b21eb7f Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-04T13:35:15Z [FLINK-7400][cluster] fix cut-off memory not used for off-heap reserve as intended + fix description of `containerized.heap-cutoff-ratio` commit cda9f0b9aab154d12315c09f22f5dbd8da791f72 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-09T14:16:31Z [FLINK-7400][yarn] add an integration test for yarn container memory restrictions using off-heap memory commit c3648e1c7486cb40a7948b7e2449b4fb82524a60 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-01T11:24:00Z [FLINK-7316][network] always use off-heap network buffers This is another step at using or own (off-heap) buffers for network communication that we pass through netty in order to avoid unnecessary buffer copies. commit f68d42dcd6920a7439d9861c40604e5c8755eeba Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-04T13:59:48Z [FLINK-7316][docs] add a note of network buffers always being off-heap commit 228ca0a857b310db4e89e41e73fee57702394524 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-08T15:39:31Z [FLINK-7316][network] remove a dead code path and adapt a unit test still relying on it - remove the code path for `offHeapSize == -1` which does not exist anymore - just in case, make sure by adding some additional checks - fix a unit test not accomodating for this (introduced by the new off-heap network buffers) commit 38eec713b1e474f52031751d8423eca2971949cb Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-10T09:49:11Z [FLINK-7316][tests] fix `ContaineredTaskManagerParametersTest` to include the cutoff commit d015293e11b1fc894e7bc665c2e9aa544f0aaaad Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-10T10:42:09Z [FLINK-7316][network] partly revert a wrong change to TaskManagerServices#createMemoryManager() We still need the distinction between on-heap and off-heap there if fractions are used because the memory manager's size is based on different values in the two cases. commit 52560d94e3df42499d62b41919b6102a9f57b58b Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-10T12:16:04Z [FLINK-7316][tests] further adapt YARNSessionCapacitySchedulerITCase to the changed memory settings commit 2a43a05f67f987fbc47ca5c73941ff6107033a23 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-10T12:22:15Z [hotfix] adapt YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnCluster***() tests From the comments they should also test that a changed `yarn.heap-cutoff-ratio` is passed correctly but the value used (50%) is too low for 1024MB task manager memory limit to be larger than the `containerized.heap-cutoff-min` of 600MB. This increases it to 70% and adapts the expected memory settings. commit 341acfba84515640d92dc0aa5e6ad1fe3ec4ffc8 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-07T15:38:36Z [FLINK-7411][network] minor (performance) improvements in NettyMessage * use a switch rather than multiple if conditions * use static `readFrom` methods to create instances of the message sub types commit 5b16d3c36f6b6073af5b90d221089ddf29e17f5a Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-07T16:12:28Z [FLINK-7412][network] optimise NettyMessage.TaskEventRequest#readFrom() to read from netty buffers directly commit dc1d29871040828108bc793ee90f7a867d9f40de Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-10T14:58:19Z [FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol - removes one level of (unneeded) abstraction for clarity commit 2ce22de5e288dab534a6cb5e0eb8edd3cb163619 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-24T10:17:08Z [FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures This fixes a double-recycle in SpillableSubpartitionView and also makes sure that even if adding the (asynchronous) write operation fails, the buffer is properly freed in code that did not perform this cleanup. It avoids code duplication of this cleanup and it is also more consistent to take over responsibility of the given buffer even if an exception is thrown. commit 703c11bf5e8a5cfd53b2b878f5b66768ba718902 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-24T14:49:46Z [FLINK-7513][tests] remove TestBufferFactory#MOCK_BUFFER This static buffer did not allow proper reference counting and we should rather create test buffers in the tests which may also be released afterwards. commit a9e85f4c1c4a2e64d7209c42d6aba73d78026de4 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-24T15:14:38Z [FLINK-7515][network] allow actual 0-length content in NettyMessage#allocateBuffer() Previously, length "0" meant "unknown content length" but there are cases where the actual length is 0 and so we use -1 for tagging the special case now. commit b82cee16ccb36c29e0673a2b3c00dbc412ed08e8 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-25T07:49:44Z [FLINK-7514][tests] fix BackPressureStatsTrackerITCase releasing buffers twice commit 7a970978aa3f02ca7b557fb4536dfa29374f0a09 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-22T16:33:55Z [FLINK-7516][memory] do not allow copies into a read-only ByteBuffer commit add24bcab9bfde31a6d4e03483188a48196b321b Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-23T10:04:28Z [FLINK-7517][network] let NettyBufferPool extend PooledByteBufAllocator Previously, NettyBufferPool only wrapped PooledByteBufAllocator but then, any allocated buffer's alloc() method was returning the wrapped PooledByteBufAllocator which allowed heap buffers again. By extending the PooledByteBufAllocator, we prevent this loop hole. This also fixes the invariant that a copy of a buffer should have the same allocator. commit 1e3511f2a921c70749c04b54a591a435cce5c476 Author: Nico Kruber <n...@data-artisans.com> Date: 2017-08-22T15:45:45Z [FLINK-7520][network] let our Buffer class extend from netty's buffer class For this, use a common (flink) Buffer interface and an implementation (NetworkBuffer) that implements netty's buffer methods as well. In the future, with this, we are able to avoid unnecessary buffer copies when handing buffers over to netty while keeping our MemorySegment logic and configuration. For the netty-specific part, the NetworkBuffer also requires a ByteBuf allocator which is otherwise not needed in out use cases, so if the buffer is handed over to netty, it requires a byte buffer allocator to be set. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---