This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 49d599c7f737388db71f531091d285f03ae1f2f4 Author: Gergely Fürnstáhl <[email protected]> AuthorDate: Thu Jul 13 16:56:35 2023 +0200 IMPALA-12233: Fixed PHJ hanging caused by cyclic barrier Partitioned Hash Join with a limit could hang when using mt_dop>0, due to the cyclic barrier in PHJBuilder is not cancelled properly. Added possibility to unregister threads from the synchronization and a call to it to PHJNode::Close(), so closing threads won't block still processing ones. Testing: - Added new unit tests covering new feature - Added e2e test to make sure the join does not hang Change-Id: I8be75c7ce99c015964c8bbb547539e6619ba4f9b Reviewed-on: http://gerrit.cloudera.org:8080/20179 Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/partitioned-hash-join-builder.cc | 4 + be/src/exec/partitioned-hash-join-builder.h | 3 + be/src/exec/partitioned-hash-join-node.cc | 5 +- be/src/util/cyclic-barrier-test.cc | 100 +++++++++++++++++++++ be/src/util/cyclic-barrier.cc | 12 +++ be/src/util/cyclic-barrier.h | 17 ++-- .../queries/QueryTest/joins_mt_dop.test | 10 +++ 7 files changed, 145 insertions(+), 6 deletions(-) diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index 1204e3ed2..067214653 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -1383,6 +1383,10 @@ string PhjBuilder::DebugString() const { ss << " buffer_pool_client=" << buffer_pool_client_->DebugString(); return ss.str(); } +void PhjBuilder::UnregisterThreadFromBarrier() const { + DCHECK(probe_barrier_ != nullptr); + probe_barrier_->Unregister(); +} Status PhjBuilderConfig::CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn, llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn, diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h index 70fa4491f..dd578e318 100644 --- a/be/src/exec/partitioned-hash-join-builder.h +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -541,6 +541,9 @@ class PhjBuilder : public JoinBuilder { std::string DebugString() const; + /// Unregisters one probe thread from the barrier + void UnregisterThreadFromBarrier() const; + /// Computes the minimum reservation required to execute the spilling partitioned /// hash algorithm successfully for any input size (assuming enough disk space is /// available for spilled rows). This includes buffers used by the build side, diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index c62dfd3f0..b3e594f1d 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -24,8 +24,8 @@ #include "codegen/llvm-codegen.h" #include "exec/blocking-join-node.inline.h" -#include "exec/exec-node.inline.h" #include "exec/exec-node-util.h" +#include "exec/exec-node.inline.h" #include "exec/hash-table.inline.h" #include "exprs/scalar-expr-evaluator.h" #include "exprs/scalar-expr.h" @@ -35,6 +35,7 @@ #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" +#include "util/cyclic-barrier.h" #include "util/debug-util.h" #include "util/runtime-profile-counters.h" @@ -309,6 +310,8 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) { builder_->CloseFromProbe(state); waited_for_build_ = false; } + + if (builder_->num_probe_threads() > 1) builder_->UnregisterThreadFromBarrier(); } ScalarExprEvaluator::Close(other_join_conjunct_evals_, state); if (probe_expr_results_pool_ != nullptr) probe_expr_results_pool_->FreeAll(); diff --git a/be/src/util/cyclic-barrier-test.cc b/be/src/util/cyclic-barrier-test.cc index cafacd4d0..033a4553f 100644 --- a/be/src/util/cyclic-barrier-test.cc +++ b/be/src/util/cyclic-barrier-test.cc @@ -155,6 +155,106 @@ TEST(CyclicBarrierTest, Cancellation) { IMPALA_ASSERT_DEBUG_DEATH(barrier.Cancel(Status::OK()), ""); } +// Test that unregister functions as expected when the last call is an Unregister +TEST(CyclicBarrierTest, UnregisterLast) { + const int NUM_THREADS = 8; + const int NUM_OF_UNREGISTERING_THREADS = 6; + int counter = 0; + AtomicInt32 waits_complete{0}; + AtomicInt32 unregisters_complete{0}; + CyclicBarrier barrier(NUM_THREADS + NUM_OF_UNREGISTERING_THREADS); + thread_group threads; + thread_group unregistering_threads; + // All threads should join the barrier, waiting. + for (int i = 0; i < NUM_THREADS; ++i) { + threads.add_thread(new thread([&barrier, &waits_complete, &counter]() { + Status status = barrier.Wait([&counter]() { + ++counter; + return Status::OK(); + }); + EXPECT_TRUE(status.ok()); + waits_complete.Add(1); + })); + } + SleepForMs(10); // Give other threads a chance to start before unregistering + EXPECT_EQ(0, counter) << "The callback should not have run."; + + // All threads but one unregisters, the others are still waiting in the barrier + for (int i = 0; i < NUM_OF_UNREGISTERING_THREADS - 1; ++i) { + unregistering_threads.add_thread(new thread([&barrier, &unregisters_complete]() { + barrier.Unregister(); + unregisters_complete.Add(1); + })); + } + unregistering_threads.join_all(); + + EXPECT_EQ(0, counter) << "The callback should not have run."; + EXPECT_EQ(0, waits_complete.Load()) << "Threads should not have returned."; + EXPECT_EQ(NUM_OF_UNREGISTERING_THREADS - 1, unregisters_complete.Load()) + << "Unregisters should have returned."; + + barrier.Unregister(); + + threads.join_all(); + + EXPECT_EQ(1, counter) << "Counter should have been incremented by the woken up thread " + "after the last Unregister."; + EXPECT_EQ(NUM_THREADS, waits_complete.Load()) << "Threads should have returned."; +} + +// Test that unregister functions as expected when the last call is a Wait. +TEST(CyclicBarrierTest, UnregisterNotLast) { + const int NUM_THREADS = 8; + const int NUM_OF_UNREGISTERING_THREADS = 6; + int counter = 0; + AtomicInt32 waits_complete{0}; + AtomicInt32 unregisters_complete{0}; + CyclicBarrier barrier(NUM_THREADS + NUM_OF_UNREGISTERING_THREADS); + thread_group threads; + thread_group unregistering_threads; + // All threads but one should join the barrier, waiting. + for (int i = 0; i < NUM_THREADS - 1; ++i) { + threads.add_thread(new thread([&barrier, &waits_complete, &counter]() { + Status status = barrier.Wait([&counter]() { + ++counter; + return Status::OK(); + }); + EXPECT_TRUE(status.ok()); + waits_complete.Add(1); + })); + } + SleepForMs(10); // Give other threads a chance to start before unregistering + EXPECT_EQ(0, counter) << "The callback should not have run."; + + // All threads unregister, the others are still waiting in the barrier for the last one + for (int i = 0; i < NUM_OF_UNREGISTERING_THREADS; ++i) { + unregistering_threads.add_thread(new thread([&barrier, &unregisters_complete]() { + barrier.Unregister(); + unregisters_complete.Add(1); + })); + } + unregistering_threads.join_all(); + + EXPECT_EQ(0, counter) << "The callback should not have run."; + EXPECT_EQ(0, waits_complete.Load()) << "Threads should not have returned."; + EXPECT_EQ(NUM_OF_UNREGISTERING_THREADS, unregisters_complete.Load()) + << "Unregisters should have returned."; + + threads.add_thread(new thread([&barrier, &waits_complete, &counter]() { + Status status = barrier.Wait([&counter]() { + ++counter; + return Status::OK(); + }); + EXPECT_TRUE(status.ok()); + waits_complete.Add(1); + })); + + threads.join_all(); + + EXPECT_EQ(1, counter) << "Counter should have been incremented by the last thread."; + EXPECT_EQ(NUM_THREADS, waits_complete.Load()) << "Threads should have returned."; +} + // Passing an empty/null function to Wait() is not supported. TEST(CyclicBarrierTest, NullFunction) { CyclicBarrier barrier(1); diff --git a/be/src/util/cyclic-barrier.cc b/be/src/util/cyclic-barrier.cc index aed0efda8..af681a774 100644 --- a/be/src/util/cyclic-barrier.cc +++ b/be/src/util/cyclic-barrier.cc @@ -34,4 +34,16 @@ void CyclicBarrier::Cancel(const Status& err) { } barrier_cv_.NotifyAll(); } + +void CyclicBarrier::Unregister() { + bool notify = false; + { + unique_lock<mutex> l(lock_); + if (!cancel_status_.ok()) return; // Already cancelled. + --num_threads_; + DCHECK_GE(num_threads_, 0); + if (num_waiting_threads_ == num_threads_) notify = true; + } + if (notify) barrier_cv_.NotifyOne(); +} } // namespace impala diff --git a/be/src/util/cyclic-barrier.h b/be/src/util/cyclic-barrier.h index 065c1ae7b..9c845db2c 100644 --- a/be/src/util/cyclic-barrier.h +++ b/be/src/util/cyclic-barrier.h @@ -54,13 +54,16 @@ class CyclicBarrier { if (num_waiting_threads_ < num_threads_) { // Wait for the last thread to wake us up. int64_t start_cycle = cycle_num_; - while (cancel_status_.ok() && cycle_num_ == start_cycle) { + while (cancel_status_.ok() && cycle_num_ == start_cycle + && num_waiting_threads_ < num_threads_) { barrier_cv_.Wait(l); } - return cancel_status_; + if (!cancel_status_.ok() || cycle_num_ > start_cycle) { + return cancel_status_; + } } - // This is the last thread and barrier isn't cancelled. We can proceed by - // resetting state for the next cycle. + // This is the last thread or a woken up thread by the last unregister and barrier + // isn't cancelled. We can proceed by resetting state for the next cycle. fn_status = fn(); if (fn_status.ok()) { num_waiting_threads_ = 0; @@ -79,9 +82,13 @@ class CyclicBarrier { // 'err' must be a non-OK status. void Cancel(const Status& err); + // Unregisters one thread from the synchronization, wakes up one waiting thread to + // execute 'fn' of Wait() if the unregistering thread was the last one. + void Unregister(); + private: // The number of threads participating in synchronization. - const int num_threads_; + int num_threads_; // Protects below members. std::mutex lock_; diff --git a/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test b/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test index 02c56139e..a3a2bfcc3 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test +++ b/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test @@ -49,3 +49,13 @@ int,int 6,6 7,7 ==== +---- QUERY +# IMPALA-12233: make sure PHJ does not hang with limit +select ss_cdemo_sk from tpcds.store_sales where ss_sold_date_sk = (select max(ss_sold_date_sk) from tpcds.store_sales) group by ss_cdemo_sk limit 3; +---- RESULTS +row_regex: [0-9]* +row_regex: [0-9]* +row_regex: [0-9]* +---- TYPES +int +==== \ No newline at end of file
