starocean999 commented on code in PR #43930: URL: https://github.com/apache/doris/pull/43930#discussion_r1853402138
########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java: ########## @@ -0,0 +1,525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.ImportColumnsStmt; +import org.apache.doris.analysis.ImportDeleteOnStmt; +import org.apache.doris.analysis.ImportSequenceStmt; +import org.apache.doris.analysis.ImportWhereStmt; +import org.apache.doris.analysis.LabelName; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.Separator; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.Util; +import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.Scope; +import org.apache.doris.nereids.analyzer.UnboundRelation; +import org.apache.doris.nereids.glue.translator.ExpressionTranslator; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.nereids.jobs.executor.Rewriter; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.analysis.BindRelation; +import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer; +import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.OlapScan; +import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnDesc; +import org.apache.doris.nereids.trees.plans.commands.load.LoadDeleteOnClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadPartitionNames; +import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSequenceClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; + +/** + * info in creating routine load. + */ +public class CreateRoutineLoadInfo { + // routine load properties + public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = "desired_concurrent_number"; + public static final String CURRENT_CONCURRENT_NUMBER_PROPERTY = "current_concurrent_number"; + // max error number in ten thousand records + public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number"; + public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio"; + // the following 3 properties limit the time and batch size of a single routine load task + public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY = "max_batch_interval"; + public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows"; + public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size"; + public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit"; + + public static final String FORMAT = "format"; // the value is csv or json, default is csv + public static final String STRIP_OUTER_ARRAY = "strip_outer_array"; + public static final String JSONPATHS = "jsonpaths"; + public static final String JSONROOT = "json_root"; + public static final String NUM_AS_STRING = "num_as_string"; + public static final String FUZZY_PARSE = "fuzzy_parse"; + public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String WORKLOAD_GROUP = "workload_group"; + public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; + public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; + public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; + public static final java.util.function.Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> v > 0L; + public static final java.util.function.Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L; + public static final java.util.function.Predicate<Double> MAX_FILTER_RATIO_PRED = (v) -> v >= 0 && v <= 1; + public static final java.util.function.Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> v >= 1; + public static final java.util.function.Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> v >= 200000; + public static final java.util.function.Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> v >= 100 * 1024 * 1024 + && v <= (long) (1024 * 1024 * 1024) * 10; + public static final java.util.function.Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L; + public static final Predicate<Long> SEND_BATCH_PARALLELISM_PRED = (v) -> v > 0L; + + private static final String NAME_TYPE = "ROUTINE LOAD NAME"; + + private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>() + .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) + .add(MAX_ERROR_NUMBER_PROPERTY) + .add(MAX_FILTER_RATIO_PROPERTY) + .add(MAX_BATCH_INTERVAL_SEC_PROPERTY) + .add(MAX_BATCH_ROWS_PROPERTY) + .add(MAX_BATCH_SIZE_PROPERTY) + .add(FORMAT) + .add(JSONPATHS) + .add(STRIP_OUTER_ARRAY) + .add(NUM_AS_STRING) + .add(FUZZY_PARSE) + .add(JSONROOT) + .add(LoadStmt.STRICT_MODE) + .add(LoadStmt.TIMEZONE) + .add(EXEC_MEM_LIMIT_PROPERTY) + .add(SEND_BATCH_PARALLELISM) + .add(LOAD_TO_SINGLE_TABLET) + .add(PARTIAL_COLUMNS) + .add(WORKLOAD_GROUP) + .add(LoadStmt.KEY_ENCLOSE) + .add(LoadStmt.KEY_ESCAPE) + .build(); + + private final LabelName labelName; + private String tableName; + private final Map<String, LoadProperty> loadPropertyMap; + private final Map<String, String> jobProperties; + private final String typeName; + + // the following variables will be initialized after analyze + // -1 as unset, the default value will set in RoutineLoadJob + private String name; + private String dbName; + private RoutineLoadDesc routineLoadDesc; + private int desiredConcurrentNum = 1; + private long maxErrorNum = -1; + private double maxFilterRatio = -1; + private long maxBatchIntervalS = -1; + private long maxBatchRows = -1; + private long maxBatchSizeBytes = -1; + private boolean strictMode = true; + private long execMemLimit = 2 * 1024 * 1024 * 1024L; + private String timezone = TimeUtils.DEFAULT_TIME_ZONE; + private int sendBatchParallelism = 1; + private boolean loadToSingleTablet = false; + /** + * RoutineLoad support json data. + * Require Params: + * 1) dataFormat = "json" + * 2) jsonPaths = "$.XXX.xxx" + */ + private String format = ""; //default is csv. + private String jsonPaths = ""; + private String jsonRoot = ""; // MUST be a jsonpath string + private boolean stripOuterArray = false; + private boolean numAsString = false; + private boolean fuzzyParse = false; + + private byte enclose; + + private byte escape; + + private long workloadGroupId = -1; + + /** + * support partial columns load(Only Unique Key Columns) + */ + private boolean isPartialUpdate = false; + + private String comment = ""; + + private LoadTask.MergeType mergeType; + + private boolean isMultiTable = false; + + private AbstractDataSourceProperties dataSourceProperties; + + /** + * constructor for create table + */ + public CreateRoutineLoadInfo(LabelName labelName, String tableName, Map<String, LoadProperty> loadPropertyMap, + Map<String, String> jobProperties, String typeName, + Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType, + String comment) { + this.labelName = labelName; + if (StringUtils.isBlank(tableName)) { + this.isMultiTable = true; + } + this.tableName = tableName; + this.loadPropertyMap = loadPropertyMap; + this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties; + this.typeName = typeName.toUpperCase(); + this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory + .createDataSource(typeName, dataSourceProperties, this.isMultiTable); + this.mergeType = mergeType; + this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true"); + if (comment != null) { + this.comment = comment; + } + } + + /** + * analyze create table info + */ + public void validate(ConnectContext ctx) throws UserException { + // check dbName and tableName + checkDBTable(ctx); + // check name + try { + FeNameFormat.checkCommonName(NAME_TYPE, name); + } catch (org.apache.doris.common.AnalysisException e) { + // 64 is the length of regular expression matching + // (FeNameFormat.COMMON_NAME_REGEX/UNDERSCORE_COMMON_NAME_REGEX) + throw new AnalysisException(e.getMessage() + + " Maybe routine load job name is longer than 64 or contains illegal characters"); + } + // check load properties include column separator etc. + checkLoadProperties(ctx); + // check routine load job properties include desired concurrent number etc. + checkJobProperties(); + // check data source properties + checkDataSourceProperties(); + // analyze merge type + if (routineLoadDesc != null) { + if (mergeType != LoadTask.MergeType.MERGE && routineLoadDesc.getDeleteCondition() != null) { + throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE."); + } + if (mergeType == LoadTask.MergeType.MERGE && routineLoadDesc.getDeleteCondition() == null) { + throw new AnalysisException("Excepted DELETE ON clause when merge type is MERGE."); + } + } else if (mergeType == LoadTask.MergeType.MERGE) { + throw new AnalysisException("Excepted DELETE ON clause when merge type is MERGE."); + } + } + + private void checkDBTable(ConnectContext ctx) throws AnalysisException { + dbName = labelName.getDbName(); + if (Strings.isNullOrEmpty(dbName)) { + dbName = ctx.getDatabase(); + if (Strings.isNullOrEmpty(dbName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + } + name = labelName.getLabelName(); + FeNameFormat.checkLabel(name); + + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); + if (isPartialUpdate && isMultiTable) { + throw new AnalysisException("Partial update is not supported in multi-table load."); + } + if (isMultiTable) { + return; + } + if (Strings.isNullOrEmpty(tableName)) { + throw new AnalysisException("Table name should not be null"); + } + Table table = db.getTableOrAnalysisException(tableName); + if (mergeType != LoadTask.MergeType.APPEND + && (table.getType() != Table.TableType.OLAP + || ((OlapTable) table).getKeysType() != KeysType.UNIQUE_KEYS)) { + throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables."); + } + if (mergeType != LoadTask.MergeType.APPEND + && !(table.getType() == Table.TableType.OLAP && ((OlapTable) table).hasDeleteSign())) { + throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete."); + } + if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); + } + } + + private void checkLoadProperties(ConnectContext ctx) throws UserException { + Separator columnSeparator = null; + // TODO(yangzhengguo01): add line delimiter to properties + Separator lineDelimiter = null; + ImportColumnsStmt importColumnsStmt = null; + ImportWhereStmt precedingImportWhereStmt = null; + ImportWhereStmt importWhereStmt = null; + ImportSequenceStmt importSequenceStmt = null; + PartitionNames partitionNames = null; + ImportDeleteOnStmt importDeleteOnStmt = null; + CascadesContext cascadesContext = null; + ExpressionAnalyzer analyzer = null; + PlanTranslatorContext context = null; + if (!isMultiTable) { + List<String> nameParts = Lists.newArrayList(); + nameParts.add(dbName); + nameParts.add(tableName); + Plan unboundRelation = new UnboundRelation(StatementScopeIdGenerator.newRelationId(), nameParts); + cascadesContext = CascadesContext.initContext(ctx.getStatementContext(), unboundRelation, + PhysicalProperties.ANY); + Rewriter.getWholeTreeRewriterWithCustomJobs(cascadesContext, + ImmutableList.of(Rewriter.bottomUp(new BindRelation()))).execute(); + Plan boundRelation = cascadesContext.getRewritePlan(); + // table could have delete sign in LogicalFilter above + if (cascadesContext.getRewritePlan() instanceof LogicalFilter) { + boundRelation = (Plan) ((LogicalFilter) cascadesContext.getRewritePlan()).child(); + } + context = new PlanTranslatorContext(cascadesContext); + List<Slot> slots = boundRelation.getOutput(); + Scope scope = new Scope(slots); + analyzer = new ExpressionAnalyzer(null, scope, cascadesContext, false, false); + + Map<SlotReference, SlotRef> translateMap = Maps.newHashMap(); + + TupleDescriptor tupleDescriptor = context.generateTupleDesc(); + tupleDescriptor.setTable(((OlapScan) boundRelation).getTable()); + for (int i = 0; i < boundRelation.getOutput().size(); i++) { + SlotReference slotReference = (SlotReference) boundRelation.getOutput().get(i); + SlotRef slotRef = new SlotRef(null, slotReference.getName()); + translateMap.put(slotReference, slotRef); + context.createSlotDesc(tupleDescriptor, slotReference, ((OlapScan) boundRelation).getTable()); + } + } + + if (loadPropertyMap != null) { + for (LoadProperty loadProperty : loadPropertyMap.values()) { + loadProperty.validate(); + if (loadProperty instanceof LoadSeparator) { + String oriSeparator = ((LoadSeparator) loadProperty).getOriSeparator(); + String separator = Separator.convertSeparator(oriSeparator); + columnSeparator = new Separator(separator, oriSeparator); + } else if (loadProperty instanceof LoadColumnClause) { + if (isMultiTable) { + throw new AnalysisException("Multi-table load does not support setting columns info"); + } + List<ImportColumnDesc> importColumnDescList = new ArrayList<>(); + for (LoadColumnDesc columnDesc : ((LoadColumnClause) loadProperty).getColumns()) { + if (columnDesc.getExpression() != null) { + Expr expr = translateToLegacyExpr(columnDesc.getExpression(), analyzer, + context, cascadesContext); + importColumnDescList.add(new ImportColumnDesc(columnDesc.getColumnName(), expr)); + } else { + importColumnDescList.add(new ImportColumnDesc(columnDesc.getColumnName(), null)); + } + } + importColumnsStmt = new ImportColumnsStmt(importColumnDescList); + } else if (loadProperty instanceof LoadWhereClause) { + if (isMultiTable) { + throw new AnalysisException("Multi-table load does not support setting columns info"); + } + Expr expr = translateToLegacyExpr(((LoadWhereClause) loadProperty).getExpression(), + analyzer, context, cascadesContext); + if (((LoadWhereClause) loadProperty).isPreceding()) { + precedingImportWhereStmt = new ImportWhereStmt(expr, + ((LoadWhereClause) loadProperty).isPreceding()); + } else { + importWhereStmt = new ImportWhereStmt(expr, ((LoadWhereClause) loadProperty).isPreceding()); + } + } else if (loadProperty instanceof LoadPartitionNames) { + partitionNames = new PartitionNames(((LoadPartitionNames) loadProperty).isTemp(), + ((LoadPartitionNames) loadProperty).getPartitionNames()); + } else if (loadProperty instanceof LoadDeleteOnClause) { + Expr expr = translateToLegacyExpr(((LoadDeleteOnClause) loadProperty).getExpression(), + analyzer, context, cascadesContext); + importDeleteOnStmt = new ImportDeleteOnStmt(expr); + } else if (loadProperty instanceof LoadSequenceClause) { + importSequenceStmt = new ImportSequenceStmt( + ((LoadSequenceClause) loadProperty).getSequenceColName()); + } + } + } + routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, importColumnsStmt, Review Comment: Better to create a new RoutineLoadInfo class and remove the dependence of ImportColumnsStmt, ImportWhereStmt. Just keep Expression as its member variable. And add setRoutineLoadDesc(RoutineLoadInfo info) into RoutineLoadJob to use the new class.(you can do it in next pr) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org