Hello Joe McDonnell, Steve Carlin, Csaba Ringhofer, Impala Public Jenkins,
I'd like you to reexamine a change. Please visit
http://gerrit.cloudera.org:8080/22094
to look at the new patch set (#26).
Change subject: IMPALA-13533: Calcite CTE backend
......................................................................
IMPALA-13533: Calcite CTE backend
Adds support for CTEs in distributed planning. CTEs are structured like
an exchange, where one CTE fragment can feed multiple CTE exchanges.
Creates a LocalMultiSink as the sink of a CTE fragment, and Sequence
nodes are discarded in the distributed plan.
The multi-cast nature of CTEs creates a directed acyclic graph in our
fragment structure that Impala has not previously dealt with. A guard is
added to avoid scheduling the same fragment multiple times.
Unions require special handling, as they're the only instance where
multiple CTEs and/or exchanges can appear in a fragment. Each consumer
is associated with a producer; the fragment must be scheduled on all
nodes which have producers scheduled to them, which is handled similar
to how exchanges are scheduled. Each consumer PlanNode then has one (or
more with MT_DOP>1) fragment instances scheduled on each node in the
fragment.
Note that not all nodes may have a producer, and there may not be as
many producer fragment instances on a node as there are consumer
fragment instances (when insufficient scan ranges exist to support
multiple readers on a ScanNode under the producer). So consumers need to
allow that a producer may not exist (and return 0 rows), and we need to
map each producer to one or more consumers (to better distribute results
among threads).
This implies that the number of consumers for a producer is specific to
each fragment instance, and can only be determined at runtime. We must
also map map each PlanNode in a fragment instance to the producer
per_fragment_instance_idx, which is used to distinguish between multiple
instances of the producer when creating LocalExchangers.
Implements backend for CTEs in the Calcite planner. CTE output is added
to a LocalExchanger, then pulled concurrently. LocalExchangers are
registered with QueryState so all fragments can access them;
registration is done during plan fragment construction so all instances
can find the LocalExchanger or identify it's absence.
Mimics UnionNode's MaterializeBatch for translating the CTE tuple to the
expected output tuple, with passthrough for cases where input and output
row layouts match.
Adds cte-distributed showing distributed planning and basic execution.
Tested with TPC-DS queries (DecimalV2 version). Setup
start-impala-cluster.py --env_vars=USE_CALCITE_PLANNER=true \
--impalad_args=--default_query_options=use_calcite_planner=true,cte_threshold=1
impala-py.test tests/query_test/test_tpcds_queries.py::TestTpcdsDecimalV2Query
Also tested with mt_dop=2.
Change-Id: I48f16d495d4b37be97e6a913f0eb5b94d70e199a
---
M be/src/codegen/gen_ir_descriptions.py
M be/src/codegen/impala-ir.cc
M be/src/exec/CMakeLists.txt
A be/src/exec/cte-consumer-node-ir.cc
A be/src/exec/cte-consumer-node.cc
A be/src/exec/cte-consumer-node.h
A be/src/exec/cte-producer-node.cc
A be/src/exec/cte-producer-node.h
M be/src/exec/data-sink.cc
M be/src/exec/exec-node.cc
A be/src/exec/local-multi-sink.cc
A be/src/exec/local-multi-sink.h
A be/src/exec/sequence-node.cc
A be/src/exec/sequence-node.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator.cc
M be/src/runtime/descriptors.cc
A be/src/runtime/local-exchanger.cc
A be/src/runtime/local-exchanger.h
M be/src/runtime/query-state.h
M be/src/scheduling/schedule-state.cc
M be/src/scheduling/schedule-state.h
M be/src/scheduling/scheduler.cc
M common/protobuf/admission_control_service.proto
M common/thrift/DataSinks.thrift
M common/thrift/ImpalaInternalService.thrift
M fe/src/main/java/org/apache/impala/planner/CTEConsumerNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
A fe/src/main/java/org/apache/impala/planner/LocalMultiSink.java
M fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M
java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java
A testdata/workloads/functional-query/queries/QueryTest/cte-distributed.test
M tests/common/test_result_verifier.py
M tests/custom_cluster/test_calcite_planner.py
37 files changed, 1,530 insertions(+), 30 deletions(-)
git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/94/22094/26
--
To view, visit http://gerrit.cloudera.org:8080/22094
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I48f16d495d4b37be97e6a913f0eb5b94d70e199a
Gerrit-Change-Number: 22094
Gerrit-PatchSet: 26
Gerrit-Owner: Michael Smith <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Joe McDonnell <[email protected]>
Gerrit-Reviewer: Michael Smith <[email protected]>
Gerrit-Reviewer: Steve Carlin <[email protected]>