GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5748
[FLINK-9053][runtime] only release outputs under the checkpoint lock ## What is the purpose of the change Releasing an operator chain's outputs will call `RecordWriter#clearBuffers()` and this may not be run in parallel with `RecordWriter#broadcastEvent()` which the asynchronous checkpoint barrier trigger inside `Task` may run via `StreamTask#triggerCheckpoint()`. Now, during the cleanup of `StreamTask#invoke()`, `StreamTask`'s asynchronous services are shut down but not those of the `Task` and also `operatorChain.releaseOutputs()` is not put under the checkpoint lock. Therefore, the following may run in parallel: - `Task`'s checkpoint trigger execution - `operatorChain.releaseOutputs()` We may guard `operatorChain.releaseOutputs()` with the the checkpoint lock and should be safe to do so since we already closed all of `StreamTask`'s asynchronous executors and also disposed the operators. Hence nothing should be blocking the cleanup by holding the checkpoint lock. @StephanEwen can you please have a look to verify the safety of this? ## Brief change log - add the checkpoint lock in the cleanup of `StreamTask#invoke()` around `operatorChain.releaseOutputs()` ## Verifying this change This is a very rare race condition that was uncovered by the `RescalingITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-9053 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5748 ---- commit 9f0295aa3c02e4870b248241cb9094d14863a686 Author: Stefan Richter <s.richter@...> Date: 2018-02-26T17:03:14Z [hotfix] Improved logging for task local recovery (cherry picked from commit 56c7560) commit e9e13dec10f1a1ee57c46719d758885c4f33dcf3 Author: Stephan Ewen <sewen@...> Date: 2018-02-27T15:53:03Z [hotfix] [core] Suppress unused warning config options only used in shell scripts and doc generation. commit a269f8519305faff153e84d729873b6f9497bd36 Author: Stephan Ewen <sewen@...> Date: 2018-02-27T16:04:29Z [FLINK-8798] [core] Make force 'commons-logging' to be parent-first loaded. commit 1d26062de130c05fdbe7701b55766b4a8d433418 Author: Xingcan Cui <xingcanc@...> Date: 2018-02-12T10:11:36Z [FLINK-8538][table]Add a Kafka table source factory with JSON format support commit db2c510fb4f171c9e9940759e5fbaf466ec74474 Author: Timo Walther <twalthr@...> Date: 2018-02-19T12:35:45Z [FLINK-8538] [table] Improve unified table sources This closes #5564. commit 23358ff87003fd6603c0ca19bc37f31944d2c494 Author: Stephan Ewen <sewen@...> Date: 2018-02-26T15:41:24Z [FLINK-8791] [docs] Fix documentation about configuring dependencies commit acf114793c708f0ab207008c25195f6f65796e5f Author: gyao <gary@...> Date: 2018-02-21T15:02:01Z [FLINK-8730][REST] JSON serialize entire SerializedThrowable Do not only serialize the serialized exception but the entire SerializedThrowable object. This makes it possible to throw the SerializedThrowable itself without deserializing it. This closes #5546. commit 2f6cb37c775106bb684ef9c608585e7a72056460 Author: gyao <gary@...> Date: 2018-02-27T15:58:53Z [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor This closes #5591. commit 51d5bc6c5151c2aed3f932f84c35da43689501ec Author: vinoyang <vinoyang@...> Date: 2018-02-27T06:43:52Z [FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString This closes #5587. commit 08e615027acd426537dc580139a61bd4082b7c3f Author: Till Rohrmann <trohrmann@...> Date: 2018-02-28T09:11:44Z [FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue commit 302aaeb021bacf3f37cb9a3ee236304c94adbf30 Author: Timo Walther <twalthr@...> Date: 2018-02-22T16:22:54Z [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant This closes #5567. commit 59b607b0c411b7d01b97db302d0f124b28ef0d0e Author: sihuazhou <summerleafs@...> Date: 2018-02-26T02:54:53Z [FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery (cherry picked from commit 296f9ff) commit cf854ccbc6fdbf112095c471705c8799aee64a45 Author: Aljoscha Krettek <aljoscha.krettek@...> Date: 2018-02-28T14:35:13Z [hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource commit 623e94459795a191703b880fcfa4f162c92ae458 Author: Stefan Richter <s.richter@...> Date: 2018-02-28T13:25:55Z [FLINK-8557][checkpointing] Remove illegal characters from operator description text before using it to construct the instance directory in RocksDB (cherry picked from commit 66474da) commit bb459cc68f8dc4bd042b61e365e583d4e96b3e0e Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-23T10:37:37Z [hotfix][tests] Deduplicate code in SingleInputGateTest (cherry picked from commit 67a547a) commit 6b7a4480ef8610df3ff21eb2811b9a0a3c58c912 Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-23T11:11:14Z [hotfix][runtime] Remove duplicated check (cherry picked from commit 42f71f6) commit 651462e6b22c51ce14bd9ea6db389ef6a1f38e55 Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-23T10:20:21Z [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable might incorrectly return false. This might caused some dead locks. (cherry picked from commit 6c9e267) commit 8e62f90739e2319491df983917dc7ab484de2550 Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-23T10:27:54Z [hotfix][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase (cherry picked from commit 2c2e189) commit 8eb6a30798c09d171e3eb8019b53e677252bd5ba Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-23T10:28:20Z [FLINK-8694][runtime] Fix notifyDataAvailable race condition Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls. This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test for flushAlways (without this fix this test is dead locking). (cherry picked from commit ebd39f3) commit 61a34a691e7d5233f18ac72a1ab8fb09b53c4753 Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-26T15:13:06Z [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method For example, previously if the method was used to check for EndOfPartitionEvent and the Buffer contained huge custom event, the even had to be deserialized before performing the actual check. Now we are quickly entering the correct if/else branch and doing full costly deserialization only if we have to. Other calls to isEvent() then checking against EndOfPartitionEvent were not used. (cherry picked from commit 767027f) commit d5338c4154e5de029b3b30e3ef0a0732bf7f68e7 Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-27T09:39:00Z [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent Because of race condition between: 1. releasing inputChannelsWithData lock in this method and reaching this place 2. empty data notification that re-enqueues a channel we can end up with moreAvailable flag set to true, while we expect no more data. This commit detects such situation, makes a correct assertion and turn off moreAvailable flag. (cherry picked from commit b9b7416) commit 32384ed9b00cf0e1961d355dc4080f25a2156e58 Author: Zhijiang <wangzhijiang999@...> Date: 2018-02-22T14:41:38Z [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should be updated properly (cherry picked from commit 6e9e0dd) commit f1453276095c55264f7b4097d16e2987a44b3f33 Author: Zhijiang <wangzhijiang999@...> Date: 2018-02-23T01:55:57Z [hotfix] Fix package private and comments (cherry picked from commit 6165b3d) commit 18ff2ce15bdb1e7bd246e438e47527a24559c86d Author: Nico Kruber <nico@...> Date: 2018-02-26T16:50:10Z [hotfix][network] minor improvements in UnionInputGate (cherry picked from commit 4203557) commit 9265666517830350a4a7037029e347f33df1bea2 Author: Nico Kruber <nico@...> Date: 2018-02-26T16:52:37Z [FLINK-8737][network] disallow creating a union of UnionInputGate instances Recently, the pollNextBufferOrEvent() was added but not implemented but this is used in getNextBufferOrEvent() and thus any UnionInputGate containing a UnionInputGate would have failed already. There should be no use case for wiring up inputs this way. Therefore, fail early when trying to construct this. (cherry picked from commit e8de538) commit 26c8f6c2a3ff75ffb954c816a57908318a2d8099 Author: Stephan Ewen <sewen@...> Date: 2018-02-28T11:15:30Z [hotfix] [tests] Fix SelfConnectionITCase The test previously did not fail on failed execution, and thus evaluated incomplete results from a failed execution with th expected results. This cleans up serialization warnings and uses lambdas where possible, to make the code more readable. commit f60e46dafa8950d5e40cd8a3286c172ecaea6b73 Author: gyao <gary@...> Date: 2018-02-28T12:04:19Z [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService commit b7247929d0745b3b83306d0c93d97faf4ece4107 Author: gyao <gary@...> Date: 2018-02-28T12:06:00Z [hotfix][Javadoc] Fix typo in YARN Utils: teh -> the commit adb3750226971f7c67a0d3069103b56e4fee1c27 Author: gyao <gary@...> Date: 2018-02-28T12:07:04Z [hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the commit 94bbd564ce5214b3366cc6d299fcb99ae62a2bd8 Author: gyao <gary@...> Date: 2018-02-28T12:08:25Z [hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase Test swapped actual and expected arguments. Remove catching Throwable in test; instead propagate all exceptions. ---- ---