This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3f7307d [Spark Load]Add spark etl job main class (#3927)
3f7307d is described below
commit 3f7307d685ac53eede87c506f257d71fdddbf648
Author: wyb <[email protected]>
AuthorDate: Wed Jun 24 13:54:55 2020 +0800
[Spark Load]Add spark etl job main class (#3927)
1. Add SparkEtlJob class
2. Remove DppResult comment
3. Support loading from hive table directly
#3433
---
be/src/olap/push_handler.cpp | 6 +-
be/src/olap/push_handler.h | 2 +-
.../org/apache/doris/analysis/CreateTableStmt.java | 5 +-
.../java/org/apache/doris/analysis/GrantStmt.java | 6 +-
.../java/org/apache/doris/analysis/LoadStmt.java | 6 +-
.../java/org/apache/doris/analysis/RevokeStmt.java | 3 +-
.../java/org/apache/doris/catalog/Catalog.java | 13 +-
.../java/org/apache/doris/catalog/ResourceMgr.java | 7 +-
.../main/java/org/apache/doris/common/Config.java | 6 +
.../org/apache/doris/common/FeMetaVersion.java | 4 +-
.../org/apache/doris/journal/JournalEntity.java | 4 +-
.../org/apache/doris/load/BrokerFileGroup.java | 8 +-
.../main/java/org/apache/doris/load/EtlStatus.java | 13 +-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 3 +-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 1 -
.../org/apache/doris/load/loadv2/LoadManager.java | 12 +-
.../doris/load/loadv2/SparkEtlJobHandler.java | 18 +-
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 11 +-
.../load/loadv2/dpp/DorisKryoRegistrator.java | 4 +
.../doris/load/loadv2/dpp/GlobalDictBuilder.java | 6 +-
.../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 21 +-
.../apache/doris/load/loadv2/etl/EtlJobConfig.java | 4 +
.../apache/doris/load/loadv2/etl/SparkEtlJob.java | 250 +++++++++++++++++++++
.../org/apache/doris/mysql/privilege/PaloAuth.java | 9 +-
.../org/apache/doris/mysql/privilege/PaloRole.java | 10 +-
.../doris/persist/DropResourceOperationLog.java | 53 +++++
.../java/org/apache/doris/persist/EditLog.java | 12 +-
.../java/org/apache/doris/persist/PrivInfo.java | 10 +-
.../org/apache/doris/analysis/GrantStmtTest.java | 3 +-
.../org/apache/doris/analysis/LoadStmtTest.java | 3 +-
.../doris/load/loadv2/SparkEtlJobHandlerTest.java | 13 +-
.../doris/load/loadv2/etl/SparkEtlJobTest.java | 152 +++++++++++++
.../org/apache/doris/mysql/privilege/AuthTest.java | 3 +-
33 files changed, 556 insertions(+), 125 deletions(-)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 62e80d8..fa5c6bd 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -944,9 +944,9 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
LOG(WARNING) << "Failed to init mem trackers, msg: " <<
status.get_error_msg();
return OLAP_ERR_PUSH_INIT_ERROR;
}
- _runtime_profile.reset(_runtime_state->runtime_profile());
+ _runtime_profile = _runtime_state->runtime_profile();
_runtime_profile->set_name("PushBrokerReader");
- _mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1,
_runtime_profile->name(), _runtime_state->instance_mem_tracker()));
+ _mem_tracker.reset(new MemTracker(_runtime_profile, -1,
_runtime_profile->name(), _runtime_state->instance_mem_tracker()));
_mem_pool.reset(new MemPool(_mem_tracker.get()));
_counter.reset(new ScannerCounter());
@@ -955,7 +955,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
switch (t_scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET:
scanner = new ParquetScanner(_runtime_state.get(),
- _runtime_profile.get(),
+ _runtime_profile,
t_scan_range.params,
t_scan_range.ranges,
t_scan_range.broker_addresses,
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index a48716e..181905d 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -247,7 +247,7 @@ private:
Tuple* _tuple;
const Schema* _schema;
std::unique_ptr<RuntimeState> _runtime_state;
- std::unique_ptr<RuntimeProfile> _runtime_profile;
+ RuntimeProfile* _runtime_profile;
std::unique_ptr<MemTracker> _mem_tracker;
std::unique_ptr<MemPool> _mem_pool;
std::unique_ptr<ScannerCounter> _counter;
diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index ded7c78..a31983b 100644
--- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -94,9 +94,6 @@ public class CreateTableStmt extends DdlStmt {
// for backup. set to -1 for normal use
private int tableSignature;
- // TODO(wyb): spark-load
- private static boolean disableHiveTable = true;
-
public CreateTableStmt() {
// for persist
tableName = new TableName();
@@ -259,7 +256,7 @@ public class CreateTableStmt extends DdlStmt {
analyzeEngineName();
// TODO(wyb): spark-load
- if (engineName.equals("hive") && disableHiveTable) {
+ if (engineName.equals("hive") && !Config.enable_spark_load) {
throw new AnalysisException("Spark Load from hive table is comming
soon");
}
// analyze key desc
diff --git a/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java
b/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java
index 3d7674a..3df0335 100644
--- a/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
@@ -46,9 +47,6 @@ public class GrantStmt extends DdlStmt {
private ResourcePattern resourcePattern;
private List<PaloPrivilege> privileges;
- // TODO(wyb): spark-load
- public static boolean disableGrantResource = true;
-
public GrantStmt(UserIdentity userIdent, String role, TablePattern
tblPattern, List<AccessPrivilege> privileges) {
this.userIdent = userIdent;
this.role = role;
@@ -111,7 +109,7 @@ public class GrantStmt extends DdlStmt {
tblPattern.analyze(analyzer.getClusterName());
} else {
// TODO(wyb): spark-load
- if (disableGrantResource) {
+ if (!Config.enable_spark_load) {
throw new AnalysisException("GRANT ON RESOURCE is comming
soon");
}
resourcePattern.analyze();
diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
index a9f87c2..b3636b6 100644
--- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -19,6 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
@@ -100,9 +101,6 @@ public class LoadStmt extends DdlStmt {
private String version = "v2";
- // TODO(wyb): spark-load
- public static boolean disableSparkLoad = true;
-
// properties set
private final static ImmutableSet<String> PROPERTIES_SET = new
ImmutableSet.Builder<String>()
.add(TIMEOUT_PROPERTY)
@@ -288,7 +286,7 @@ public class LoadStmt extends DdlStmt {
resourceDesc.analyze();
etlJobType = resourceDesc.getEtlJobType();
// TODO(wyb): spark-load
- if (disableSparkLoad) {
+ if (!Config.enable_spark_load) {
throw new AnalysisException("Spark Load is comming soon");
}
// check resource usage privilege
diff --git a/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java
b/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java
index 61dd664..9d2ce60 100644
--- a/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.mysql.privilege.PaloPrivilege;
import org.apache.doris.mysql.privilege.PrivBitSet;
@@ -98,7 +99,7 @@ public class RevokeStmt extends DdlStmt {
tblPattern.analyze(analyzer.getClusterName());
} else {
// TODO(wyb): spark-load
- if (GrantStmt.disableGrantResource) {
+ if (!Config.enable_spark_load) {
throw new AnalysisException("REVOKE ON RESOURCE is comming
soon");
}
resourcePattern.analyze();
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 0d05468..4f2b1b7 100755
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -1460,6 +1460,7 @@ public class Catalog {
checksum = loadGlobalVariable(dis, checksum);
checksum = loadCluster(dis, checksum);
checksum = loadBrokers(dis, checksum);
+ checksum = loadResources(dis, checksum);
checksum = loadExportJob(dis, checksum);
checksum = loadBackupHandler(dis, checksum);
checksum = loadPaloAuth(dis, checksum);
@@ -1468,8 +1469,6 @@ public class Catalog {
checksum = loadColocateTableIndex(dis, checksum);
checksum = loadRoutineLoadJobs(dis, checksum);
checksum = loadLoadJobsV2(dis, checksum);
- // TODO(wyb): spark-load
- //checksum = loadResources(dis, checksum);
checksum = loadSmallFiles(dis, checksum);
checksum = loadPlugins(dis, checksum);
checksum = loadDeleteHandler(dis, checksum);
@@ -1872,13 +1871,10 @@ public class Catalog {
}
public long loadResources(DataInputStream in, long checksum) throws
IOException {
- // TODO(wyb): spark-load
- /*
- if (MetaContext.get().getMetaVersion() >=
FeMetaVersion.new_version_by_wyb) {
- resourceMgr = ResourceMgr.read(in);
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_87) {
+ resourceMgr = ResourceMgr.read(in);
}
LOG.info("finished replay resources from image");
- */
return checksum;
}
@@ -1927,6 +1923,7 @@ public class Catalog {
checksum = saveGlobalVariable(dos, checksum);
checksum = saveCluster(dos, checksum);
checksum = saveBrokers(dos, checksum);
+ checksum = saveResources(dos, checksum);
checksum = saveExportJob(dos, checksum);
checksum = saveBackupHandler(dos, checksum);
checksum = savePaloAuth(dos, checksum);
@@ -1934,8 +1931,6 @@ public class Catalog {
checksum = saveColocateTableIndex(dos, checksum);
checksum = saveRoutineLoadJobs(dos, checksum);
checksum = saveLoadJobsV2(dos, checksum);
- // TODO(wyb): spark-load
- //checksum = saveResources(dos, checksum);
checksum = saveSmallFiles(dos, checksum);
checksum = savePlugins(dos, checksum);
checksum = saveDeleteHandler(dos, checksum);
diff --git a/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java
b/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index 3932cee..f5b6a9f 100644
--- a/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcNodeInterface;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@@ -88,12 +89,12 @@ public class ResourceMgr implements Writable {
}
// log drop
- Catalog.getCurrentCatalog().getEditLog().logDropResource(name);
+ Catalog.getCurrentCatalog().getEditLog().logDropResource(new
DropResourceOperationLog(name));
LOG.info("drop resource success. resource name: {}", name);
}
- public void replayDropResource(String name) {
- nameToResource.remove(name);
+ public void replayDropResource(DropResourceOperationLog operationLog) {
+ nameToResource.remove(operationLog.getName());
}
public boolean containsResource(String name) {
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java
b/fe/src/main/java/org/apache/doris/common/Config.java
index ef38967..7ca2170 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -1088,5 +1088,11 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean drop_backend_after_decommission = true;
+
+ /*
+ * enable spark load for temporary use
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean enable_spark_load = false;
}
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index dabd7a8..7c52c94 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -183,6 +183,8 @@ public final class FeMetaVersion {
public static final int VERSION_85 = 85;
// serialize origStmt in rollupJob and mv meta
public static final int VERSION_86 = 86;
+ // spark resource, resource privilege, broker file group for hive table
+ public static final int VERSION_87 = 87;
// note: when increment meta version, should assign the latest version to
VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_86;
+ public static final int VERSION_CURRENT = VERSION_87;
}
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index b707a15..5704bc7 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -58,6 +58,7 @@ import org.apache.doris.persist.DatabaseInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.persist.ModifyPartitionInfo;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
@@ -511,8 +512,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_DROP_RESOURCE: {
- data = new Text();
- ((Text) data).readFields(in);
+ data = DropResourceOperationLog.read(in);
isRead = true;
break;
}
diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 78115b5..7a450a6 100644
--- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -358,11 +358,8 @@ public class BrokerFileGroup implements Writable {
}
// src table
- // TODO(wyb): spark-load
- /*
out.writeLong(srcTableId);
out.writeBoolean(isLoadFromTable);
- */
}
public void readFields(DataInput in) throws IOException {
@@ -414,13 +411,10 @@ public class BrokerFileGroup implements Writable {
}
}
// src table
- // TODO(wyb): spark-load
- /*
- if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.new_version) {
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_87) {
srcTableId = in.readLong();
isLoadFromTable = in.readBoolean();
}
- */
// There are no columnExprList in the previous load job which is
created before function is supported.
// The columnExprList could not be analyzed without origin stmt in the
previous load job.
diff --git a/fe/src/main/java/org/apache/doris/load/EtlStatus.java
b/fe/src/main/java/org/apache/doris/load/EtlStatus.java
index bafebc4..bc01f43 100644
--- a/fe/src/main/java/org/apache/doris/load/EtlStatus.java
+++ b/fe/src/main/java/org/apache/doris/load/EtlStatus.java
@@ -20,7 +20,7 @@ package org.apache.doris.load;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
-//import org.apache.doris.load.loadv2.dpp.DppResult;
+import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.thrift.TEtlState;
import com.google.common.base.Strings;
@@ -46,7 +46,7 @@ public class EtlStatus implements Writable {
// 0 - 100
private int progress;
private String failMsg;
- //private DppResult dppResult;
+ private DppResult dppResult;
public EtlStatus() {
this.state = TEtlState.RUNNING;
@@ -56,7 +56,7 @@ public class EtlStatus implements Writable {
this.fileMap = Maps.newHashMap();
this.progress = 0;
this.failMsg = "";
- //this.dppResult = null;
+ this.dppResult = null;
}
public TEtlState getState() {
@@ -128,8 +128,6 @@ public class EtlStatus implements Writable {
this.failMsg = failMsg;
}
- // TODO(wyb): spark-load
- /*
public DppResult getDppResult() {
return dppResult;
}
@@ -137,7 +135,6 @@ public class EtlStatus implements Writable {
public void setDppResult(DppResult dppResult) {
this.dppResult = dppResult;
}
- */
public void reset() {
this.stats.clear();
@@ -145,7 +142,7 @@ public class EtlStatus implements Writable {
this.fileMap.clear();
this.progress = 0;
this.failMsg = "";
- //this.dppResult = null;
+ this.dppResult = null;
}
@Override
@@ -158,7 +155,7 @@ public class EtlStatus implements Writable {
", fileMap=" + fileMap +
", progress=" + progress +
", failMsg='" + failMsg + '\'' +
- //", dppResult='" + dppResult + '\'' +
+ ", dppResult='" + dppResult + '\'' +
'}';
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index a42f4e7..9ca4c77 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
@@ -248,7 +249,7 @@ public class BrokerLoadJob extends BulkLoadJob {
// check data quality
if (!checkDataQuality()) {
- cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
+ cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
DataQualityException.QUALITY_FAIL_MSG),
true, true);
return;
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 28b556c..e9d89d7 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -82,7 +82,6 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
private static final Logger LOG = LogManager.getLogger(LoadJob.class);
- protected static final String QUALITY_FAIL_MSG = "quality not good enough
to cancel";
protected static final String DPP_NORMAL_ALL = "dpp.norm.ALL";
protected static final String DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
public static final String UNSELECTED_ROWS = "unselected.rows";
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index c3f25e1..9b0741b 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -17,10 +17,6 @@
package org.apache.doris.load.loadv2;
-import static org.apache.doris.load.FailMsg.CancelType.ETL_RUN_FAIL;
-import static org.apache.doris.load.FailMsg.CancelType.LOAD_RUN_FAIL;
-import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG;
-
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
@@ -159,7 +155,7 @@ public class LoadManager implements Writable{
return e.getTxnId();
} catch (UserException e) {
if (loadJob != null) {
- loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL,
e.getMessage()), false,
+ loadJob.cancelJobWithoutCheck(new
FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), false,
false /* no need to write edit log, because
createLoadJob log is not wrote yet */);
}
throw e;
@@ -403,11 +399,11 @@ public class LoadManager implements Writable{
((SparkLoadJob) job).updateEtlStatus();
} catch (DataQualityException e) {
LOG.info("update load job etl status failed. job id:
{}", job.getId(), e);
- job.cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
+ job.cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
DataQualityException.QUALITY_FAIL_MSG),
true, true);
} catch (UserException e) {
LOG.warn("update load job etl status failed. job id:
{}", job.getId(), e);
- job.cancelJobWithoutCheck(new FailMsg(ETL_RUN_FAIL,
e.getMessage()), true, true);
+ job.cancelJobWithoutCheck(new
FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
} catch (Exception e) {
LOG.warn("update load job etl status failed. job id:
{}", job.getId(), e);
}
@@ -422,7 +418,7 @@ public class LoadManager implements Writable{
((SparkLoadJob) job).updateLoadingStatus();
} catch (UserException e) {
LOG.warn("update load job loading status failed. job
id: {}", job.getId(), e);
- job.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL,
e.getMessage()), true, true);
+ job.cancelJobWithoutCheck(new
FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
} catch (Exception e) {
LOG.warn("update load job loading status failed. job
id: {}", job.getId(), e);
}
diff --git
a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 84013c4..ff295d6 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -25,8 +25,9 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.load.EtlStatus;
-//import org.apache.doris.load.loadv2.dpp.DppResult;
+import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.load.loadv2.etl.SparkEtlJob;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TEtlState;
@@ -46,12 +47,12 @@ import org.apache.spark.launcher.SparkAppHandle.Listener;
import org.apache.spark.launcher.SparkAppHandle.State;
import org.apache.spark.launcher.SparkLauncher;
-//import com.google.common.base.Strings;
+import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-//import com.google.gson.Gson;
-//import com.google.gson.JsonSyntaxException;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -73,7 +74,6 @@ public class SparkEtlJobHandler {
private static final String CONFIG_FILE_NAME = "jobconfig.json";
private static final String APP_RESOURCE_LOCAL_PATH =
PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME;
private static final String JOB_CONFIG_DIR = "configs";
- private static final String MAIN_CLASS =
"org.apache.doris.load.loadv2.etl.SparkEtlJob";
private static final String ETL_JOB_NAME = "doris__%s";
// 5min
private static final int GET_APPID_MAX_RETRY_TIMES = 300;
@@ -112,10 +112,7 @@ public class SparkEtlJobHandler {
launcher.setMaster(resource.getMaster())
.setDeployMode(resource.getDeployMode().name().toLowerCase())
.setAppResource(appResourceHdfsPath)
- // TODO(wyb): spark-load
- // replace with getCanonicalName later
- //.setMainClass(SparkEtlJob.class.getCanonicalName())
- .setMainClass(MAIN_CLASS)
+ .setMainClass(SparkEtlJob.class.getCanonicalName())
.setAppName(String.format(ETL_JOB_NAME, loadLabel))
.addAppArgs(jobConfigHdfsPath);
// spark configs
@@ -220,8 +217,6 @@ public class SparkEtlJobHandler {
if (status.getState() == TEtlState.FINISHED || status.getState() ==
TEtlState.CANCELLED) {
// get dpp result
- // TODO(wyb): spark-load
- /*
String dppResultFilePath =
EtlJobConfig.getDppResultFilePath(etlOutputPath);
try {
byte[] data = BrokerUtil.readFile(dppResultFilePath,
brokerDesc);
@@ -234,7 +229,6 @@ public class SparkEtlJobHandler {
} catch (UserException | JsonSyntaxException |
UnsupportedEncodingException e) {
LOG.warn("read broker file failed. path: {}",
dppResultFilePath, e);
}
- */
}
return status;
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 715b6af..f1dff69 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -17,8 +17,6 @@
package org.apache.doris.load.loadv2;
-import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG;
-
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.DescriptorTable;
@@ -58,7 +56,7 @@ import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
-//import org.apache.doris.load.loadv2.dpp.DppResult;
+import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.FrontendOptions;
@@ -76,7 +74,7 @@ import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
-//import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TabletQuorumFailedException;
@@ -313,8 +311,6 @@ public class SparkLoadJob extends BulkLoadJob {
loadingStatus.setTrackingUrl(appId);
}
- // TODO(wyb): spark-load
- /*
DppResult dppResult = etlStatus.getDppResult();
if (dppResult != null) {
// update load statistic and counters when spark etl job finished
@@ -331,14 +327,13 @@ public class SparkLoadJob extends BulkLoadJob {
counters.put(DPP_ABNORMAL_ALL,
String.valueOf(dppResult.abnormalRows));
counters.put(UNSELECTED_ROWS,
String.valueOf(dppResult.unselectRows));
}
- */
}
private void unprotectedProcessEtlFinish(EtlStatus etlStatus,
SparkEtlJobHandler handler) throws Exception {
unprotectedUpdateEtlStatusInternal(etlStatus);
// checkDataQuality
if (!checkDataQuality()) {
- throw new DataQualityException(QUALITY_FAIL_MSG);
+ throw new
DataQualityException(DataQualityException.QUALITY_FAIL_MSG);
}
// get etl output files and update loading state
diff --git
a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
index d2568bb..f0528ec 100644
---
a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
+++
b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
@@ -20,10 +20,14 @@ package org.apache.doris.load.loadv2.dpp;
import com.esotericsoftware.kryo.Kryo;
import org.apache.spark.serializer.KryoRegistrator;
+/**
+ * register etl classes with Kryo when using Kryo serialization.
+ */
public class DorisKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
kryo.register(org.apache.doris.load.loadv2.Roaring64Map.class);
+ kryo.register(org.apache.doris.load.loadv2.BitmapValue.class);
}
}
\ No newline at end of file
diff --git
a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
index b220b40..8275edf 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
@@ -27,8 +27,8 @@ import org.apache.spark.sql.catalog.Column;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
@@ -200,7 +200,7 @@ public class GlobalDictBuilder {
maxDictValue = (long)row.get(0);
minDictValue = (long)row.get(1);
}
- LOG.info(" column {} 's max value in dict is {} , min value is
{}", distinctColumnNameTmp, maxDictValue, minDictValue);
+ LOG.info(" column " + distinctColumnNameTmp + " 's max value
in dict is " + maxDictValue + ", min value is " + minDictValue);
// maybe never happened, but we need detect it
if (minDictValue < 0) {
throw new RuntimeException(String.format(" column %s 's
cardinality has exceed bigint's max value", distinctColumnNameTmp));
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 80becc4..3bc8c95 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -746,11 +746,18 @@ public final class SparkDpp implements
java.io.Serializable {
}
private Dataset<Row> loadDataFromHiveTable(SparkSession spark,
- String hiveTableName,
+ String hiveDbTableName,
EtlJobConfig.EtlIndex baseIndex,
EtlJobConfig.EtlFileGroup
fileGroup,
StructType dstTableSchema)
throws UserException {
- Dataset<Row> dataframe = spark.sql("select * from " + hiveTableName);
+ // select base index columns from hive table
+ StringBuilder sql = new StringBuilder();
+ sql.append("select ");
+ baseIndex.columns.forEach(column -> {
+ sql.append(column.columnName).append(",");
+ });
+ sql.deleteCharAt(sql.length() - 1).append(" from
").append(hiveDbTableName);
+ Dataset<Row> dataframe = spark.sql(sql.toString());
dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe,
dstTableSchema, fileGroup);
return dataframe;
}
@@ -805,13 +812,13 @@ public final class SparkDpp implements
java.io.Serializable {
for (EtlJobConfig.EtlFileGroup fileGroup :
etlTable.fileGroups) {
List<String> filePaths = fileGroup.filePaths;
Dataset<Row> fileGroupDataframe = null;
- if (Strings.isNullOrEmpty(fileGroup.hiveDbTableName)) {
+ EtlJobConfig.SourceType sourceType = fileGroup.sourceType;
+ if (sourceType == EtlJobConfig.SourceType.FILE) {
fileGroupDataframe = loadDataFromFilePaths(spark,
baseIndex, filePaths, fileGroup, dstTableSchema);
+ } else if (sourceType == EtlJobConfig.SourceType.HIVE) {
+ fileGroupDataframe = loadDataFromHiveTable(spark,
fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema);
} else {
- String taskId =
etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
- String dorisIntermediateHiveTable =
String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME,
-
tableId, taskId);
- fileGroupDataframe = loadDataFromHiveTable(spark,
dorisIntermediateHiveTable, baseIndex, fileGroup, dstTableSchema);
+ throw new RuntimeException("Unknown source type: " +
sourceType.name());
}
if (fileGroupDataframe == null) {
LOG.info("no data for file file group:" + fileGroup);
diff --git
a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
index b9767f7..6e6756c 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
@@ -486,6 +486,10 @@ public class EtlJobConfig implements Serializable {
@SerializedName(value = "hiveTableProperties")
public Map<String, String> hiveTableProperties;
+ // hive db table used in dpp, not serialized
+ // set with hiveDbTableName (no bitmap column) or
IntermediateHiveTable (created by global dict builder) in spark etl job
+ public String dppHiveDbTableName;
+
// for data infile path
public EtlFileGroup(SourceType sourceType, List<String> filePaths,
List<String> fileFieldNames,
List<String> columnsFromPath, String
columnSeparator, String lineDelimiter,
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
b/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
new file mode 100644
index 0000000..6fe8291
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.etl;
+
+import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder;
+import org.apache.doris.load.loadv2.dpp.SparkDpp;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
+
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * SparkEtlJob is responsible for global dict building, data partition, data
sort and data aggregation.
+ * 1. init job config
+ * 2. check if job has bitmap_dict function columns
+ * 3. build global dict if step 2 is true
+ * 4. dpp (data partition, data sort and data aggregation)
+ */
+public class SparkEtlJob {
+ private static final Logger LOG = LogManager.getLogger(SparkEtlJob.class);
+
+ private static final String BITMAP_DICT_FUNC = "bitmap_dict";
+ private static final String TO_BITMAP_FUNC = "to_bitmap";
+
+ private String jobConfigFilePath;
+ private EtlJobConfig etlJobConfig;
+ private Set<Long> hiveSourceTables;
+ private Map<Long, Set<String>> tableToBitmapDictColumns;
+ private SparkSession spark;
+
+ private SparkEtlJob(String jobConfigFilePath) {
+ this.jobConfigFilePath = jobConfigFilePath;
+ this.etlJobConfig = null;
+ this.hiveSourceTables = Sets.newHashSet();
+ this.tableToBitmapDictColumns = Maps.newHashMap();
+ }
+
+ private void initSparkEnvironment() {
+ SparkConf conf = new SparkConf();
+ //serialization conf
+ conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrator",
"org.apache.doris.load.loadv2.dpp.DorisKryoRegistrator");
+ conf.set("spark.kryo.registrationRequired", "false");
+ spark =
SparkSession.builder().enableHiveSupport().config(conf).getOrCreate();
+ }
+
+ private void initSparkConfigs(Map<String, String> configs) {
+ if (configs == null) {
+ return;
+ }
+ for (Map.Entry<String, String> entry : configs.entrySet()) {
+ spark.sparkContext().conf().set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void initConfig() {
+ LOG.info("job config file path: " + jobConfigFilePath);
+ Dataset<String> ds = spark.read().textFile(jobConfigFilePath);
+ String jsonConfig = ds.first();
+ LOG.info("rdd read json config: " + jsonConfig);
+ etlJobConfig = EtlJobConfig.configFromJson(jsonConfig);
+ LOG.info("etl job config: " + etlJobConfig);
+ }
+
+ /*
+ * 1. check bitmap column
+ * 2. fill tableToBitmapDictColumns
+ * 3. remove bitmap_dict and to_bitmap mapping from columnMappings
+ */
+ private void checkConfig() throws Exception {
+ for (Map.Entry<Long, EtlTable> entry : etlJobConfig.tables.entrySet())
{
+ boolean isHiveSource = false;
+ Set<String> bitmapDictColumns = Sets.newHashSet();
+ for (EtlFileGroup fileGroup : entry.getValue().fileGroups) {
+ if (fileGroup.sourceType == EtlJobConfig.SourceType.HIVE) {
+ isHiveSource = true;
+ }
+ Map<String, EtlColumnMapping> newColumnMappings =
Maps.newHashMap();
+ for (Map.Entry<String, EtlColumnMapping> mappingEntry :
fileGroup.columnMappings.entrySet()) {
+ String columnName = mappingEntry.getKey();
+ String exprStr = mappingEntry.getValue().toDescription();
+ String funcName =
functions.expr(exprStr).expr().prettyName();
+ if (funcName.equalsIgnoreCase(BITMAP_DICT_FUNC)) {
+ bitmapDictColumns.add(columnName);
+ } else if (!funcName.equalsIgnoreCase(TO_BITMAP_FUNC)) {
+ newColumnMappings.put(mappingEntry.getKey(),
mappingEntry.getValue());
+ }
+ }
+ // reset new columnMappings
+ fileGroup.columnMappings = newColumnMappings;
+ }
+ if (isHiveSource) {
+ hiveSourceTables.add(entry.getKey());
+ }
+ if (!bitmapDictColumns.isEmpty()) {
+ tableToBitmapDictColumns.put(entry.getKey(),
bitmapDictColumns);
+ }
+ }
+ LOG.info("init hiveSourceTables: " + hiveSourceTables + ",
tableToBitmapDictColumns: " + tableToBitmapDictColumns);
+
+ // spark etl must have only one table with bitmap type column to
process.
+ if (hiveSourceTables.size() > 1 || tableToBitmapDictColumns.size() >
1) {
+ throw new Exception("spark etl job must have only one hive table
with bitmap type column to process");
+ }
+ }
+
+ private void processDpp() throws Exception {
+ SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig);
+ sparkDpp.init();
+ sparkDpp.doDpp();
+ }
+
+ private String buildGlobalDictAndEncodeSourceTable(EtlTable table, long
tableId) {
+ // dict column map
+ MultiValueMap dictColumnMap = new MultiValueMap();
+ for (String dictColumn : tableToBitmapDictColumns.get(tableId)) {
+ dictColumnMap.put(dictColumn, null);
+ }
+
+ // doris schema
+ List<String> dorisOlapTableColumnList = Lists.newArrayList();
+ for (EtlIndex etlIndex : table.indexes) {
+ if (etlIndex.isBaseIndex) {
+ for (EtlColumn column : etlIndex.columns) {
+ dorisOlapTableColumnList.add(column.columnName);
+ }
+ }
+ }
+
+ // hive db and tables
+ EtlFileGroup fileGroup = table.fileGroups.get(0);
+ String sourceHiveDBTableName = fileGroup.hiveDbTableName;
+ String dorisHiveDB = sourceHiveDBTableName.split("\\.")[0];
+ String taskId =
etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
+ String globalDictTableName =
String.format(EtlJobConfig.GLOBAL_DICT_TABLE_NAME, tableId);
+ String distinctKeyTableName =
String.format(EtlJobConfig.DISTINCT_KEY_TABLE_NAME, tableId, taskId);
+ String dorisIntermediateHiveTable =
String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME, tableId, taskId);
+ String sourceHiveFilter = fileGroup.where;
+
+ // others
+ List<String> mapSideJoinColumns = Lists.newArrayList();
+ int buildConcurrency = 1;
+ List<String> veryHighCardinalityColumn = Lists.newArrayList();
+ int veryHighCardinalityColumnSplitNum = 1;
+
+ LOG.info("global dict builder args, dictColumnMap: " + dictColumnMap
+ + ", dorisOlapTableColumnList: " +
dorisOlapTableColumnList
+ + ", sourceHiveDBTableName: " + sourceHiveDBTableName
+ + ", sourceHiveFilter: "+ sourceHiveFilter
+ + ", distinctKeyTableName: " + distinctKeyTableName
+ + ", globalDictTableName: " + globalDictTableName
+ + ", dorisIntermediateHiveTable: " +
dorisIntermediateHiveTable);
+ try {
+ GlobalDictBuilder globalDictBuilder = new GlobalDictBuilder(
+ dictColumnMap, dorisOlapTableColumnList,
mapSideJoinColumns, sourceHiveDBTableName,
+ sourceHiveFilter, dorisHiveDB, distinctKeyTableName,
globalDictTableName, dorisIntermediateHiveTable,
+ buildConcurrency, veryHighCardinalityColumn,
veryHighCardinalityColumnSplitNum, spark);
+ globalDictBuilder.createHiveIntermediateTable();
+ globalDictBuilder.extractDistinctColumn();
+ globalDictBuilder.buildGlobalDict();
+ globalDictBuilder.encodeDorisIntermediateHiveTable();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return String.format("%s.%s", dorisHiveDB, dorisIntermediateHiveTable);
+ }
+
+ private void processData() throws Exception {
+ if (!hiveSourceTables.isEmpty()) {
+ // only one table
+ long tableId = -1;
+ EtlTable table = null;
+ for (Map.Entry<Long, EtlTable> entry :
etlJobConfig.tables.entrySet()) {
+ tableId = entry.getKey();
+ table = entry.getValue();
+ break;
+ }
+
+ // init hive configs like metastore service
+ EtlFileGroup fileGroup = table.fileGroups.get(0);
+ initSparkConfigs(fileGroup.hiveTableProperties);
+ fileGroup.dppHiveDbTableName = fileGroup.hiveDbTableName;
+
+ // build global dict and encode source hive table if has bitmap
dict columns
+ if (!tableToBitmapDictColumns.isEmpty() &&
tableToBitmapDictColumns.containsKey(tableId)) {
+ String dorisIntermediateHiveDbTableName =
buildGlobalDictAndEncodeSourceTable(table, tableId);
+ // set with dorisIntermediateHiveDbTable
+ fileGroup.dppHiveDbTableName =
dorisIntermediateHiveDbTableName;
+ }
+ }
+
+ // data partition sort and aggregation
+ processDpp();
+ }
+
+ private void run() throws Exception {
+ initSparkEnvironment();
+ initConfig();
+ checkConfig();
+ processData();
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.err.println("missing job config file path arg");
+ System.exit(-1);
+ }
+
+ try {
+ new SparkEtlJob(args[0]).run();
+ } catch (Exception e) {
+ System.err.println("spark etl job run failed");
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java
b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java
index 19610c7..bfd2e32 100644
--- a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java
+++ b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.DppConfig;
@@ -1312,8 +1313,7 @@ public class PaloAuth implements Writable {
userPrivTable.write(out);
dbPrivTable.write(out);
tablePrivTable.write(out);
- // TODO(wyb): spark-load
- //resourcePrivTable.write(out);
+ resourcePrivTable.write(out);
propertyMgr.write(out);
}
@@ -1322,12 +1322,9 @@ public class PaloAuth implements Writable {
userPrivTable = (UserPrivTable) PrivTable.read(in);
dbPrivTable = (DbPrivTable) PrivTable.read(in);
tablePrivTable = (TablePrivTable) PrivTable.read(in);
- // TODO(wyb): spark-load
- /*
- if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.new_version_by_wyb) {
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_87) {
resourcePrivTable = (ResourcePrivTable) PrivTable.read(in);
}
- */
propertyMgr = UserPropertyMgr.read(in);
if (userPrivTable.isEmpty()) {
diff --git a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java
b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java
index 7d2f123..d9d77e4 100644
--- a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java
+++ b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java
@@ -20,6 +20,8 @@ package org.apache.doris.mysql.privilege;
import org.apache.doris.analysis.ResourcePattern;
import org.apache.doris.analysis.TablePattern;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -142,14 +144,11 @@ public class PaloRole implements Writable {
entry.getKey().write(out);
entry.getValue().write(out);
}
- // TODO(wyb): spark-load
- /*
out.writeInt(resourcePatternToPrivs.size());
for (Map.Entry<ResourcePattern, PrivBitSet> entry :
resourcePatternToPrivs.entrySet()) {
entry.getKey().write(out);
entry.getValue().write(out);
}
- */
out.writeInt(users.size());
for (UserIdentity userIdentity : users) {
userIdentity.write(out);
@@ -164,9 +163,7 @@ public class PaloRole implements Writable {
PrivBitSet privs = PrivBitSet.read(in);
tblPatternToPrivs.put(tblPattern, privs);
}
- // TODO(wyb): spark-load
- /*
- if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.new_version_by_wyb) {
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_87) {
size = in.readInt();
for (int i = 0; i < size; i++) {
ResourcePattern resourcePattern = ResourcePattern.read(in);
@@ -174,7 +171,6 @@ public class PaloRole implements Writable {
resourcePatternToPrivs.put(resourcePattern, privs);
}
}
- */
size = in.readInt();
for (int i = 0; i < size; i++) {
UserIdentity userIdentity = UserIdentity.read(in);
diff --git
a/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java
b/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java
new file mode 100644
index 0000000..8c69763
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * For resource drop
+ */
+public class DropResourceOperationLog implements Writable {
+ @SerializedName(value = "name")
+ private String name;
+
+ public DropResourceOperationLog(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static DropResourceOperationLog read(DataInput in) throws
IOException {
+ return GsonUtils.GSON.fromJson(Text.readString(in),
DropResourceOperationLog.class);
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 7f3da6c..14bb416 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -696,8 +696,8 @@ public class EditLog {
break;
}
case OperationType.OP_DROP_RESOURCE: {
- final String resourceName = journal.getData().toString();
- catalog.getResourceMgr().replayDropResource(resourceName);
+ final DropResourceOperationLog operationLog =
(DropResourceOperationLog) journal.getData();
+ catalog.getResourceMgr().replayDropResource(operationLog);
break;
}
case OperationType.OP_CREATE_SMALL_FILE: {
@@ -1277,13 +1277,11 @@ public class EditLog {
}
public void logCreateResource(Resource resource) {
- // TODO(wyb): spark-load
- //logEdit(OperationType.OP_CREATE_RESOURCE, resource);
+ logEdit(OperationType.OP_CREATE_RESOURCE, resource);
}
- public void logDropResource(String resourceName) {
- // TODO(wyb): spark-load
- //logEdit(OperationType.OP_DROP_RESOURCE, new Text(resourceName));
+ public void logDropResource(DropResourceOperationLog operationLog) {
+ logEdit(OperationType.OP_DROP_RESOURCE, operationLog);
}
public void logCreateSmallFile(SmallFile info) {
diff --git a/fe/src/main/java/org/apache/doris/persist/PrivInfo.java
b/fe/src/main/java/org/apache/doris/persist/PrivInfo.java
index 26b7394..b512fa6 100644
--- a/fe/src/main/java/org/apache/doris/persist/PrivInfo.java
+++ b/fe/src/main/java/org/apache/doris/persist/PrivInfo.java
@@ -20,6 +20,8 @@ package org.apache.doris.persist;
import org.apache.doris.analysis.ResourcePattern;
import org.apache.doris.analysis.TablePattern;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mysql.privilege.PrivBitSet;
@@ -117,15 +119,12 @@ public class PrivInfo implements Writable {
out.writeBoolean(false);
}
- // TODO(wyb): spark-load
- /*
if (resourcePattern != null) {
out.writeBoolean(true);
resourcePattern.write(out);
} else {
out.writeBoolean(false);
}
- */
if (privs != null) {
out.writeBoolean(true);
@@ -159,14 +158,11 @@ public class PrivInfo implements Writable {
tblPattern = TablePattern.read(in);
}
- // TODO(wyb): spark-load
- /*
- if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.new_version_by_wyb) {
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_87) {
if (in.readBoolean()) {
resourcePattern = ResourcePattern.read(in);
}
}
- */
if (in.readBoolean()) {
privs = PrivBitSet.read(in);
diff --git a/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java
b/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java
index dc8e1f4..7907433 100644
--- a/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java
+++ b/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java
@@ -21,6 +21,7 @@ import mockit.Expectations;
import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.qe.ConnectContext;
@@ -97,7 +98,7 @@ public class GrantStmtTest {
@Test
public void testResourceNormal() throws UserException {
// TODO(wyb): spark-load
- GrantStmt.disableGrantResource = false;
+ Config.enable_spark_load = true;
String resourceName = "spark0";
List<AccessPrivilege> privileges =
Lists.newArrayList(AccessPrivilege.USAGE_PRIV);
diff --git a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
index 993fda1..250f022 100644
--- a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
+++ b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ResourceMgr;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.mysql.privilege.PaloAuth;
@@ -107,7 +108,7 @@ public class LoadStmtTest {
// test ResourceDesc
// TODO(wyb): spark-load
- LoadStmt.disableSparkLoad = false;
+ Config.enable_spark_load = true;
stmt = new LoadStmt(new LabelName("testDb", "testLabel"),
dataDescriptionList,
new ResourceDesc(resourceName, null), null);
stmt.analyze(analyzer);
diff --git
a/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
b/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
index d288610..02a690b 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -157,9 +157,8 @@ public class SparkEtlJobHandlerTest {
result = trackingUrl;
report.getProgress();
returns(0.5f, 1f, 1f);
- // TODO(wyb): spark-load
- //BrokerUtil.readFile(anyString, (BrokerDesc) any);
- //result = "{'normal_rows': 10, 'abnormal_rows': 0,
'failed_reason': 'etl job failed'}";
+ BrokerUtil.readFile(anyString, (BrokerDesc) any);
+ result = "{'normal_rows': 10, 'abnormal_rows': 0,
'failed_reason': 'etl job failed'}";
}
};
@@ -180,17 +179,15 @@ public class SparkEtlJobHandlerTest {
status = handler.getEtlJobStatus(null, appId, loadJobId,
etlOutputPath, resource, brokerDesc);
Assert.assertEquals(TEtlState.CANCELLED, status.getState());
Assert.assertEquals(100, status.getProgress());
- // TODO(wyb): spark-load
- //Assert.assertEquals("etl job failed",
status.getDppResult().failedReason);
+ Assert.assertEquals("etl job failed",
status.getDppResult().failedReason);
// finished
status = handler.getEtlJobStatus(null, appId, loadJobId,
etlOutputPath, resource, brokerDesc);
Assert.assertEquals(TEtlState.FINISHED, status.getState());
Assert.assertEquals(100, status.getProgress());
Assert.assertEquals(trackingUrl, status.getTrackingUrl());
- // TODO(wyb): spark-load
- //Assert.assertEquals(10, status.getDppResult().normalRows);
- //Assert.assertEquals(0, status.getDppResult().abnormalRows);
+ Assert.assertEquals(10, status.getDppResult().normalRows);
+ Assert.assertEquals(0, status.getDppResult().abnormalRows);
}
@Test
diff --git
a/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
b/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
new file mode 100644
index 0000000..681e027
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.etl;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlJobProperty;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SparkEtlJobTest {
+ private long tableId;
+ private long index1Id;
+ private long index2Id;
+ private long partition1Id;
+ private long partition2Id;
+ private EtlJobConfig etlJobConfig;
+
+ @Before
+ public void setUp() {
+ tableId = 0L;
+ index1Id = 1L;
+ index2Id = 2L;
+ partition1Id = 3L;
+ partition2Id = 4L;
+
+ // indexes
+ EtlColumn k1 = new EtlColumn("k1", "INT", false, true, "NONE", "0", 0,
0, 0);
+ EtlColumn k2 = new EtlColumn("k2", "VARCHAR", false, true, "NONE",
"0", 10, 0, 0);
+ EtlColumn v1 = new EtlColumn("v1", "BIGINT", false, false, "NONE",
"0", 0, 0, 0);
+ EtlIndex index1 = new EtlIndex(index1Id, Lists.newArrayList(k1, k2,
v1), 666666, "DUPLICATE", true);
+ v1 = new EtlColumn("v1", "BIGINT", false, false, "SUM", "0", 0, 0, 0);
+ EtlIndex index2 = new EtlIndex(index2Id, Lists.newArrayList(k1, v1),
888888, "AGGREGATE", true);
+ List<EtlIndex> indexes = Lists.newArrayList(index1, index2);
+ // partition info
+ List<EtlPartition> partitions = Lists.newArrayList();
+ partitions.add(new EtlPartition(partition1Id, Lists.newArrayList(0),
Lists.newArrayList(100), false, 2));
+ partitions.add(new EtlPartition(partition2Id, Lists.newArrayList(100),
Lists.newArrayList(), true, 3));
+ EtlPartitionInfo partitionInfo = new EtlPartitionInfo("RANGE",
Lists.newArrayList("k1"), Lists.newArrayList("k2"), partitions);
+ EtlTable table = new EtlTable(indexes, partitionInfo);
+ // file group
+ Map<String, EtlColumnMapping> columnMappings = Maps.newHashMap();
+ columnMappings.put("k1", new EtlColumnMapping("k1 + 1"));
+ table.addFileGroup(new EtlFileGroup(EtlJobConfig.SourceType.FILE,
Lists.newArrayList("hdfs://127.0.0.1:10000/file"),
+ Lists.newArrayList(),
Lists.newArrayList(), "\t", "\n", false, null,
+ Maps.newHashMap(), "",
Lists.newArrayList(partition1Id, partition2Id)));
+ // tables
+ Map<Long, EtlTable> tables = Maps.newHashMap();
+ tables.put(tableId, table);
+ // others
+ String outputFilePattern = "V1.label0.%d.%d.%d.%d.%d.parquet";
+ String label = "label0";
+ EtlJobProperty properties = new EtlJobProperty();
+ properties.strictMode = false;
+ properties.timezone = "Asia/Shanghai";
+ etlJobConfig = new EtlJobConfig(tables, outputFilePattern, label,
properties);
+ }
+
+ @Test
+ public void testInitConfig(@Mocked SparkSession spark, @Injectable
Dataset<String> ds) {
+ new Expectations() {
+ {
+ SparkSession.builder().enableHiveSupport().getOrCreate();
+ result = spark;
+ spark.read().textFile(anyString);
+ result = ds;
+ ds.first();
+ result = etlJobConfig.configToJson();
+ }
+ };
+
+ SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class,
"hdfs://127.0.0.1:10000/jobconfig.json");
+ Deencapsulation.invoke(job, "initSparkEnvironment");
+ Deencapsulation.invoke(job, "initConfig");
+ EtlJobConfig parsedConfig = Deencapsulation.getField(job,
"etlJobConfig");
+ Assert.assertTrue(parsedConfig.tables.containsKey(tableId));
+ EtlTable table = parsedConfig.tables.get(tableId);
+ Assert.assertEquals(2, table.indexes.size());
+ Assert.assertEquals(2, table.partitionInfo.partitions.size());
+ Assert.assertEquals(false, parsedConfig.properties.strictMode);
+ Assert.assertEquals("label0", parsedConfig.label);
+ }
+
+ @Test
+ public void testCheckConfigWithoutBitmapDictColumns() {
+ SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class,
"hdfs://127.0.0.1:10000/jobconfig.json");
+ Deencapsulation.setField(job, "etlJobConfig", etlJobConfig);
+ Deencapsulation.invoke(job, "checkConfig");
+ Map<Long, Set<String>> tableToBitmapDictColumns =
Deencapsulation.getField(job, "tableToBitmapDictColumns");
+ // check bitmap dict columns empty
+ Assert.assertTrue(tableToBitmapDictColumns.isEmpty());
+ }
+
+ @Test
+ public void testCheckConfigWithBitmapDictColumns() {
+ SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class,
"hdfs://127.0.0.1:10000/jobconfig.json");
+ EtlTable table = etlJobConfig.tables.get(tableId);
+ table.indexes.get(0).columns.add(
+ new EtlColumn("v2", "BITMAP", false, false, "BITMAP_UNION",
"0", 0, 0, 0)
+ );
+ EtlFileGroup fileGroup = table.fileGroups.get(0);
+ fileGroup.sourceType = EtlJobConfig.SourceType.HIVE;
+ fileGroup.columnMappings.put(
+ "v2", new EtlColumnMapping("bitmap_dict",
Lists.newArrayList("v2"))
+ );
+ Deencapsulation.setField(job, "etlJobConfig", etlJobConfig);
+ Deencapsulation.invoke(job, "checkConfig");
+ // check hive source
+ Set<Long> hiveSourceTables = Deencapsulation.getField(job,
"hiveSourceTables");
+ Assert.assertTrue(hiveSourceTables.contains(tableId));
+ // check bitmap dict columns has v2
+ Map<Long, Set<String>> tableToBitmapDictColumns =
Deencapsulation.getField(job, "tableToBitmapDictColumns");
+ Assert.assertTrue(tableToBitmapDictColumns.containsKey(tableId));
+
Assert.assertTrue(tableToBitmapDictColumns.get(tableId).contains("v2"));
+ // check remove v2 bitmap_dict func mapping from file group column
mappings
+
Assert.assertFalse(table.fileGroups.get(0).columnMappings.containsKey("v2"));
+ }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java
b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java
index c7c7b53..b4c77ea 100644
--- a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java
+++ b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.DomainResolver;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.EditLog;
@@ -1061,7 +1062,7 @@ public class AuthTest {
List<AccessPrivilege> usagePrivileges =
Lists.newArrayList(AccessPrivilege.USAGE_PRIV);
UserDesc userDesc = new UserDesc(userIdentity, "12345", true);
// TODO(wyb): spark-load
- GrantStmt.disableGrantResource = false;
+ Config.enable_spark_load = true;
// ------ grant|revoke resource to|from user ------
// 1. create user with no role
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]