Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/22094 )
Change subject: IMPALA-13533: Calcite CTE backend ...................................................................... Patch Set 26: (9 comments) http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/exec/cte-consumer-node.cc File be/src/exec/cte-consumer-node.cc: http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/exec/cte-consumer-node.cc@147 PS25, Line 147: VLOG_QUERY << "No CTE exchanger present for CTE consumer: " << name_; > When is this possible? When no producer was scheduled to this frag instance Yes. See Prepare above and Scheduler::ComputeFragmentExecParam. The usual case for this is a Union with Scans spread across a disjoint set of nodes, where each Scan has been identified as a CTE. https://gerrit.cloudera.org/c/22094/26/testdata/workloads/functional-query/queries/QueryTest/cte-distributed.test#139 was specifically added to cover that case. http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/exec/cte-producer-node.cc File be/src/exec/cte-producer-node.cc: http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/exec/cte-producer-node.cc@36 PS25, Line 36: // Register the exchanger here while plan setup is single-threaded. > Consumers look for exchanges in Prepare - can you mention that this guarant Added to CTEConsumerNode::Prepare. http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/exec/cte-producer-node.cc@97 PS25, Line 97: SleepForMs(10); : *eos = exchanger_->ReadFinished(); > Why doesn't CTEProducerNode wait for the exchanger to finish (or for cancel Adjusted it to handle the loop here. It's not much different; GetNext here is only called by FragmentInstanceState::ExecInternal. A condition variable might be a nice improvement. http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/exec/sequence-node.h File be/src/exec/sequence-node.h: http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/exec/sequence-node.h@35 PS25, Line 35: class SequenceNode : public ExecNode { : public: > Should I see this node in some plan? I didn't spot it in any plans You should see it in logs, and single node plans: https://gerrit.cloudera.org/c/22091/15/testdata/workloads/functional-query/queries/QueryTest/cte.test. It's omitted in the distributed plan because each child becomes an independent fragment: https://gerrit.cloudera.org/c/22094/26/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#193. http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/runtime/local-exchanger.h File be/src/runtime/local-exchanger.h: http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/runtime/local-exchanger.h@32 PS25, Line 32: Each consumer gets its own copy of each RowBatch and tracks its own memory : /// ownership > It could be noted that the copy is the responsibility of the caller. This description was wrong, updated it. http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/runtime/local-exchanger.cc File be/src/runtime/local-exchanger.cc: http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/runtime/local-exchanger.cc@26 PS25, Line 26: consumer_count_ > Is it possible for a consumer to close while the producer is still pushing, A consumer that closes without fulling consuming the results - from query cancellation or limit - will still call CTEConsumerNode::Close, which will call Close(consumer_index) and decrease consumers_left. So I think it should always reach 0. http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/runtime/local-exchanger.cc@37 PS25, Line 37: dummy > It doesn't look completely dummy to me, as in line 45 the currently process I should explain the whole handling of cells here. I'll update this description after seeing if I can rework it. http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/runtime/local-exchanger.cc@44 PS25, Line 44: cell->consumers_left--; > I see that this works as a cell is cleaned up not when it's consumers_left This code is based on https://github.com/StarRocks/starrocks/blob/3.3.4/be/src/exec/pipeline/exchange/multi_cast_local_exchange.cpp#L127-L135. It's a little tricky: we want to return a result but still hold it until the next Pull is done by the same consumer, which signals that the previously RowBatch is no longer needed by that consumer. I'll look at reworking it. I think you need to handle more conditions otherwise, because the dummy Cell no longer works so we need to handle if something's head_ or not. http://gerrit.cloudera.org:8080/#/c/22094/25/be/src/runtime/local-exchanger.cc@70 PS25, Line 70: head_->consumers_left == 0 > Can consumers_left go below 0? If not, then there could be a DCHECK for it. Added DCHECKS when modifying it. I'm pretty sure this method will never go below 0, because it should only reach 0 when we Pull (in which case it moves to the head of that consumer's list) or we close (which skips the head of that consumer's list). -- To view, visit http://gerrit.cloudera.org:8080/22094 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I48f16d495d4b37be97e6a913f0eb5b94d70e199a Gerrit-Change-Number: 22094 Gerrit-PatchSet: 26 Gerrit-Owner: Michael Smith <[email protected]> Gerrit-Reviewer: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Joe McDonnell <[email protected]> Gerrit-Reviewer: Michael Smith <[email protected]> Gerrit-Reviewer: Steve Carlin <[email protected]> Gerrit-Comment-Date: Wed, 17 Dec 2025 21:15:52 +0000 Gerrit-HasComments: Yes
