924060929 commented on code in PR #45926: URL: https://github.com/apache/doris/pull/45926#discussion_r1916262382
########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java: ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute.worker.job; + +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; + +import java.util.List; + +/** UnassignedAllBEJob */ +public class UnassignedAllBEJob extends AbstractUnassignedJob { + public UnassignedAllBEJob(StatementContext statementContext, PlanFragment fragment, + ListMultimap<ExchangeNode, UnassignedJob> exchangeToUpstreamJob) { + super(statementContext, fragment, ImmutableList.of(), exchangeToUpstreamJob); + } + + // ExchangeNode -> upstreamFragment -> AssignedJob(instances of upstreamFragment) + @Override + public List<AssignedJob> computeAssignedJobs(DistributeContext distributeContext, + ListMultimap<ExchangeNode, AssignedJob> inputJobs) { + ConnectContext connectContext = statementContext.getConnectContext(); + // input jobs from upstream fragment - may have many instances. + ExchangeNode exchange = inputJobs.keySet().iterator().next(); // random one - should be same for any exchange. + int expectInstanceNum = exchange.getNumInstances(); + List<Long> beIds = Env.getCurrentSystemInfo().getAllBackendByCurrentCluster(true); Review Comment: I suggest add a `getAllBackendByCurrentCluster` method for workerManager ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java: ########## @@ -363,9 +395,16 @@ private ExecutorFactory selectInsertExecutorFactory( () -> new JdbcInsertExecutor(ctx, jdbcExternalTable, label, planner, Optional.of(insertCtx.orElse((new JdbcInsertCommandContext()))), emptyInsert) ); + } else if (physicalSink instanceof PhysicalDictionarySink) { + boolean emptyInsert = childIsEmptyRelation(physicalSink); + Dictionary dictionary = (Dictionary) targetTableIf; + // insertCtx is now useful for dictionary. so keep it empty is ok. Review Comment: typo: `now` -> `not` ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDictionarySink.java: ########## @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// PhysicalDictionarySink.java + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.dictionary.Dictionary; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * PhysicalDictionarySink + */ +public class PhysicalDictionarySink<CHILD_TYPE extends Plan> extends PhysicalTableSink<CHILD_TYPE> implements Sink { + + private final Database database; + + private final Dictionary dictionary; + + private final List<Column> cols; + + private final List<Slot> targetDictionarySlots; + + /** + * constructor + */ + public PhysicalDictionarySink(Database database, Dictionary dictionary, List<Column> cols, + List<NamedExpression> outputExprs, List<Slot> targetDictionarySlots, + Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, Statistics statistics, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_DICTIONARY_SINK, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.ALL_SINGLETON, statistics, child); + this.database = Objects.requireNonNull(database, "database cannot be null"); + this.dictionary = Objects.requireNonNull(dictionary, "dictionary cannot be null"); + this.cols = ImmutableList.copyOf(cols); + this.targetDictionarySlots = ImmutableList.copyOf(targetDictionarySlots); + } + + @Override + public PhysicalProperties getRequirePhysicalProperties() { + return PhysicalProperties.ALL_SINGLETON; + } + + public Database getDatabase() { + return database; + } + + public Dictionary getDictionary() { + return dictionary; + } + + public List<Column> getCols() { + return cols; + } + + public List<Slot> getTargetDictionarySlots() { + return targetDictionarySlots; + } + + @Override + public List<? extends Expression> getExpressions() { + return ImmutableList.of(); + } + + @Override + public TableIf getTargetTable() { + return dictionary; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitPhysicalDictionarySink(this, context); + } + + @Override + public Plan withChildren(List<Plan> children) { + return new PhysicalDictionarySink<>(database, dictionary, cols, outputExprs, targetDictionarySlots, + groupExpression, getLogicalProperties(), statistics, children.get(0)); + } + + @Override + public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new PhysicalDictionarySink<>(database, dictionary, cols, outputExprs, targetDictionarySlots, + groupExpression, getLogicalProperties(), statistics, child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new PhysicalDictionarySink<>(database, dictionary, cols, outputExprs, targetDictionarySlots, + groupExpression, logicalProperties.get(), statistics, children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalDictionarySink<>(database, dictionary, cols, outputExprs, targetDictionarySlots, + groupExpression, getLogicalProperties(), statistics, child()); + } + + @Override + public PhysicalDictionarySink<Plan> resetLogicalProperties() { + return new PhysicalDictionarySink<>(database, dictionary, cols, outputExprs, targetDictionarySlots, + groupExpression, getLogicalProperties(), null, child()); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database, dictionary, cols, targetDictionarySlots); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + PhysicalDictionarySink<?> that = (PhysicalDictionarySink<?>) o; + return Objects.equals(database, that.database) && Objects.equals(dictionary, that.dictionary) + && Objects.equals(cols, that.cols) && Objects.equals(targetDictionarySlots, that.targetDictionarySlots); + } + + @Override + public String toString() { + return "PhysicalDictionarySink{" + "database=" + database.getFullName() + ", dictionary=" + dictionary.getName() + + ", cols=" + cols + ", outputExprs=" + outputExprs + '}'; Review Comment: use `Utils.toSqlString()` to format string ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java: ########## @@ -81,7 +82,11 @@ private UnassignedJob buildJob( StatementContext statementContext, PlanFragment planFragment, ListMultimap<ExchangeNode, UnassignedJob> inputJobs, boolean isTopFragment) { List<ScanNode> scanNodes = collectScanNodesInThisFragment(planFragment); - if (planFragment.specifyInstances.isPresent()) { + if (planFragment.getSink() instanceof DictionarySink) { + // this fragment already set its instances in `visitPhysicalDistribute`. + // now assign to 1 BE 1 instance. + return new UnassignedAllBEJob(statementContext, planFragment, inputJobs); + } else if (planFragment.specifyInstances.isPresent()) { Review Comment: the branch planFragment.specifyInstances.isPresent must be the first branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org