This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 23582881b IMPALA-13770 (Addendum): Close expressions in
IcebergMergeCasePlan
23582881b is described below
commit 23582881befd4d3cb36b8d4766ecaad61a30d271
Author: Daniel Becker <[email protected]>
AuthorDate: Sat Feb 22 19:58:54 2025 +0100
IMPALA-13770 (Addendum): Close expressions in IcebergMergeCasePlan
IMPALA-13770 added code to call Close() on
IcebergMergeCase::{output_exprs_,filter_conjuncts_}. However, these
expressions are created by IcebergMergeCasePlan, with pointers to the
expressions copied to possibly multiple IcebergMergeCase objects.
Therefore, although it does not cause errors in practice, it is better
to close the expressions in IcebergMergeCasePlan.
This change adds a Close() method to IcebergMergeCasePlan that closes
these expressions.
This patch also calls Close() on IcebergMergeSinkConfig::merge_action_
and IcebergMergeSink::merge_action_evaluator_, which were not closed
previously.
Change-Id: Iefa998dea173051702ef08c03b489178a17a653f
Reviewed-on: http://gerrit.cloudera.org:8080/22522
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/exec/iceberg-merge-node.cc | 14 ++++++++++++--
be/src/exec/iceberg-merge-node.h | 2 ++
be/src/exec/iceberg-merge-sink.cc | 2 ++
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/be/src/exec/iceberg-merge-node.cc
b/be/src/exec/iceberg-merge-node.cc
index 0c3409bc9..bdb1b45a4 100644
--- a/be/src/exec/iceberg-merge-node.cc
+++ b/be/src/exec/iceberg-merge-node.cc
@@ -74,12 +74,24 @@ Status IcebergMergeCasePlan::Init(const TIcebergMergeCase&
tmerge_case,
return Status::OK();
}
+void IcebergMergeCasePlan::Close() {
+ ScalarExpr::Close(output_exprs_);
+ ScalarExpr::Close(filter_conjuncts_);
+}
+
Status IcebergMergePlanNode::CreateExecNode(RuntimeState* state, ExecNode**
node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new IcebergMergeNode(pool, *this, state->desc_tbl()));
return Status::OK();
}
+void IcebergMergePlanNode::Close() {
+ for (IcebergMergeCasePlan* merge_case_plan : merge_case_plans_) {
+ merge_case_plan->Close();
+ }
+ PlanNode::Close();
+}
+
IcebergMergeNode::IcebergMergeNode(
ObjectPool* pool, const IcebergMergePlanNode& pnode, const DescriptorTbl&
descs)
: ExecNode(pool, pnode, descs),
@@ -331,8 +343,6 @@ Status IcebergMergeCase::Open(RuntimeState* state) {
void IcebergMergeCase::Close(RuntimeState* state) {
ScalarExprEvaluator::Close(filter_evaluators_, state);
ScalarExprEvaluator::Close(output_evaluators_, state);
- ScalarExpr::Close(output_exprs_);
- ScalarExpr::Close(filter_conjuncts_);
}
} // namespace impala
diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h
index 59c3ffbdd..be08ac698 100644
--- a/be/src/exec/iceberg-merge-node.h
+++ b/be/src/exec/iceberg-merge-node.h
@@ -44,6 +44,7 @@ class IcebergMergePlanNode : public PlanNode {
public:
Status Init(const TPlanNode& tnode, FragmentState* state) override;
Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+ virtual void Close() override;
IcebergMergePlanNode() = default;
~IcebergMergePlanNode() override = default;
@@ -143,6 +144,7 @@ class IcebergMergeCasePlan {
~IcebergMergeCasePlan() = default;
Status Init(const TIcebergMergeCase& tmerge_case, FragmentState* state,
const RowDescriptor* row_desc);
+ void Close();
IcebergMergeCasePlan(const IcebergMergeCasePlan& other) = delete;
IcebergMergeCasePlan(IcebergMergeCasePlan&& other) = delete;
diff --git a/be/src/exec/iceberg-merge-sink.cc
b/be/src/exec/iceberg-merge-sink.cc
index 0cf6f224c..38275f01c 100644
--- a/be/src/exec/iceberg-merge-sink.cc
+++ b/be/src/exec/iceberg-merge-sink.cc
@@ -68,6 +68,7 @@ DataSink* IcebergMergeSinkConfig::CreateSink(RuntimeState*
state) const {
void IcebergMergeSinkConfig::Close() {
delete_sink_config_->Close();
insert_sink_config_->Close();
+ merge_action_->Close();
DataSinkConfig::Close();
}
@@ -139,6 +140,7 @@ Status IcebergMergeSink::FlushFinal(RuntimeState* state) {
void IcebergMergeSink::Close(RuntimeState* state) {
insert_sink_->Close(state);
delete_sink_->Close(state);
+ merge_action_evaluator_->Close(state);
DataSink::Close(state);
DCHECK(closed_);
}