starocean999 commented on code in PR #43930: URL: https://github.com/apache/doris/pull/43930#discussion_r1853390777
########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java: ########## @@ -0,0 +1,525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.ImportColumnsStmt; +import org.apache.doris.analysis.ImportDeleteOnStmt; +import org.apache.doris.analysis.ImportSequenceStmt; +import org.apache.doris.analysis.ImportWhereStmt; +import org.apache.doris.analysis.LabelName; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.Separator; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.Util; +import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.Scope; +import org.apache.doris.nereids.analyzer.UnboundRelation; +import org.apache.doris.nereids.glue.translator.ExpressionTranslator; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.nereids.jobs.executor.Rewriter; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.analysis.BindRelation; +import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer; +import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.OlapScan; +import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnDesc; +import org.apache.doris.nereids.trees.plans.commands.load.LoadDeleteOnClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadPartitionNames; +import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSequenceClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; + +/** + * info in creating routine load. + */ +public class CreateRoutineLoadInfo { + // routine load properties + public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = "desired_concurrent_number"; + public static final String CURRENT_CONCURRENT_NUMBER_PROPERTY = "current_concurrent_number"; + // max error number in ten thousand records + public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number"; + public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio"; + // the following 3 properties limit the time and batch size of a single routine load task + public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY = "max_batch_interval"; + public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows"; + public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size"; + public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit"; + + public static final String FORMAT = "format"; // the value is csv or json, default is csv + public static final String STRIP_OUTER_ARRAY = "strip_outer_array"; + public static final String JSONPATHS = "jsonpaths"; + public static final String JSONROOT = "json_root"; + public static final String NUM_AS_STRING = "num_as_string"; + public static final String FUZZY_PARSE = "fuzzy_parse"; + public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String WORKLOAD_GROUP = "workload_group"; + public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; + public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; + public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; + public static final java.util.function.Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> v > 0L; + public static final java.util.function.Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L; + public static final java.util.function.Predicate<Double> MAX_FILTER_RATIO_PRED = (v) -> v >= 0 && v <= 1; + public static final java.util.function.Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> v >= 1; + public static final java.util.function.Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> v >= 200000; + public static final java.util.function.Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> v >= 100 * 1024 * 1024 + && v <= (long) (1024 * 1024 * 1024) * 10; + public static final java.util.function.Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L; + public static final Predicate<Long> SEND_BATCH_PARALLELISM_PRED = (v) -> v > 0L; + + private static final String NAME_TYPE = "ROUTINE LOAD NAME"; + + private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>() + .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) + .add(MAX_ERROR_NUMBER_PROPERTY) + .add(MAX_FILTER_RATIO_PROPERTY) + .add(MAX_BATCH_INTERVAL_SEC_PROPERTY) + .add(MAX_BATCH_ROWS_PROPERTY) + .add(MAX_BATCH_SIZE_PROPERTY) + .add(FORMAT) + .add(JSONPATHS) + .add(STRIP_OUTER_ARRAY) + .add(NUM_AS_STRING) + .add(FUZZY_PARSE) + .add(JSONROOT) + .add(LoadStmt.STRICT_MODE) + .add(LoadStmt.TIMEZONE) + .add(EXEC_MEM_LIMIT_PROPERTY) + .add(SEND_BATCH_PARALLELISM) + .add(LOAD_TO_SINGLE_TABLET) + .add(PARTIAL_COLUMNS) + .add(WORKLOAD_GROUP) + .add(LoadStmt.KEY_ENCLOSE) + .add(LoadStmt.KEY_ESCAPE) + .build(); + + private final LabelName labelName; + private String tableName; + private final Map<String, LoadProperty> loadPropertyMap; + private final Map<String, String> jobProperties; + private final String typeName; + + // the following variables will be initialized after analyze + // -1 as unset, the default value will set in RoutineLoadJob + private String name; + private String dbName; + private RoutineLoadDesc routineLoadDesc; + private int desiredConcurrentNum = 1; + private long maxErrorNum = -1; + private double maxFilterRatio = -1; + private long maxBatchIntervalS = -1; + private long maxBatchRows = -1; + private long maxBatchSizeBytes = -1; + private boolean strictMode = true; + private long execMemLimit = 2 * 1024 * 1024 * 1024L; + private String timezone = TimeUtils.DEFAULT_TIME_ZONE; + private int sendBatchParallelism = 1; + private boolean loadToSingleTablet = false; + /** + * RoutineLoad support json data. + * Require Params: + * 1) dataFormat = "json" + * 2) jsonPaths = "$.XXX.xxx" + */ + private String format = ""; //default is csv. + private String jsonPaths = ""; + private String jsonRoot = ""; // MUST be a jsonpath string + private boolean stripOuterArray = false; + private boolean numAsString = false; + private boolean fuzzyParse = false; + + private byte enclose; + + private byte escape; + + private long workloadGroupId = -1; + + /** + * support partial columns load(Only Unique Key Columns) + */ + private boolean isPartialUpdate = false; + + private String comment = ""; + + private LoadTask.MergeType mergeType; + + private boolean isMultiTable = false; + + private AbstractDataSourceProperties dataSourceProperties; + + /** + * constructor for create table + */ + public CreateRoutineLoadInfo(LabelName labelName, String tableName, Map<String, LoadProperty> loadPropertyMap, + Map<String, String> jobProperties, String typeName, + Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType, + String comment) { + this.labelName = labelName; + if (StringUtils.isBlank(tableName)) { + this.isMultiTable = true; + } + this.tableName = tableName; + this.loadPropertyMap = loadPropertyMap; + this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties; + this.typeName = typeName.toUpperCase(); + this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory + .createDataSource(typeName, dataSourceProperties, this.isMultiTable); + this.mergeType = mergeType; + this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true"); + if (comment != null) { + this.comment = comment; + } + } + + /** + * analyze create table info + */ + public void validate(ConnectContext ctx) throws UserException { + // check dbName and tableName + checkDBTable(ctx); + // check name + try { + FeNameFormat.checkCommonName(NAME_TYPE, name); + } catch (org.apache.doris.common.AnalysisException e) { + // 64 is the length of regular expression matching + // (FeNameFormat.COMMON_NAME_REGEX/UNDERSCORE_COMMON_NAME_REGEX) + throw new AnalysisException(e.getMessage() + + " Maybe routine load job name is longer than 64 or contains illegal characters"); + } + // check load properties include column separator etc. + checkLoadProperties(ctx); + // check routine load job properties include desired concurrent number etc. + checkJobProperties(); + // check data source properties + checkDataSourceProperties(); + // analyze merge type + if (routineLoadDesc != null) { + if (mergeType != LoadTask.MergeType.MERGE && routineLoadDesc.getDeleteCondition() != null) { + throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE."); + } + if (mergeType == LoadTask.MergeType.MERGE && routineLoadDesc.getDeleteCondition() == null) { + throw new AnalysisException("Excepted DELETE ON clause when merge type is MERGE."); + } + } else if (mergeType == LoadTask.MergeType.MERGE) { + throw new AnalysisException("Excepted DELETE ON clause when merge type is MERGE."); + } + } + + private void checkDBTable(ConnectContext ctx) throws AnalysisException { + dbName = labelName.getDbName(); Review Comment: Better to create a new LabelNameInfo class and implement analyze or validate method like TableNameInfo. Because LabelNameInfo may be used in other commands, and the validate can be called in multiple place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org