This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new cd449ea KYLIN-4817 Refine CubeMigrationCLI for kylin4
cd449ea is described below
commit cd449eab1c6f49a27bc97ecfff8e1b29af92aead
Author: yaqian.zhang <[email protected]>
AuthorDate: Mon Nov 30 15:36:43 2020 +0800
KYLIN-4817 Refine CubeMigrationCLI for kylin4
---
.../apache/kylin/common/restclient/RestClient.java | 4 +-
.../engine/spark/metadata/cube/PathManager.java | 6 +
.../org/apache/kylin/tool/CubeMigrationCLI.java | 439 ++++++++-------------
3 files changed, 168 insertions(+), 281 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 955b0ff..2e99809 100644
---
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -202,7 +202,7 @@ public class RestClient {
public String getKylinProperties() throws IOException {
String url = baseUrl + "/admin/config";
- HttpGet request = new HttpGet(url);
+ HttpGet request = newGet(url);
HttpResponse response = null;
try {
response = client.execute(request);
@@ -380,7 +380,7 @@ public class RestClient {
}
private HttpGet newGet(String url) {
- HttpGet get = new HttpGet();
+ HttpGet get = new HttpGet(url);
addHttpHeaders(get);
return get;
}
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
index 0484bfc..6444715 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
+++
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
@@ -49,6 +49,12 @@ public final class PathManager {
return hdfsWorkDir + "parquet" + File.separator + cube.getName() +
File.separator + segName + "_" + identifier;
}
+ public static String getSegmentParquetStoragePath(String hdfsWorkDir,
String cubeName, CubeSegment segment) {
+ String segmentName = segment.getName();
+ String identifier = segment.getStorageLocationIdentifier();
+ return hdfsWorkDir + "parquet" + File.separator + cubeName +
File.separator + segmentName + "_" + identifier;
+ }
+
/**
* Delete segment path
*/
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 4612cef..550da0c 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -18,22 +18,20 @@
package org.apache.kylin.tool;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -42,30 +40,23 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.cube.model.DictionaryDesc;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,14 +77,17 @@ public class CubeMigrationCLI extends AbstractApplication {
protected KylinConfig dstConfig;
protected ResourceStore srcStore;
protected ResourceStore dstStore;
- protected FileSystem hdfsFS;
- private HBaseAdmin hbaseAdmin;
+ protected FileSystem hdfsFs;
+ protected Configuration conf;
protected boolean doAclCopy = false;
protected boolean doOverwrite = false;
protected boolean doMigrateSegment = true;
protected String dstProject;
+ protected String srcHdfsWorkDir;
+ protected String dstHdfsWorkDir;
private static final String ACL_PREFIX = "/acl/";
+ private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/";
public static void main(String[] args) throws IOException,
InterruptedException {
@@ -125,7 +119,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName,
String projectName, String copyAcl,
- String purgeAndDisable, String overwriteIfExists, String
realExecute)
+ String purgeAndDisable, String overwriteIfExists,
String realExecute)
throws IOException, InterruptedException {
moveCube(KylinConfig.createInstanceFromUri(srcCfgUri),
KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
@@ -133,7 +127,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String
cubeName, String projectName, String copyAcl,
- String purgeAndDisable, String overwriteIfExists, String
realExecute)
+ String purgeAndDisable, String overwriteIfExists,
String realExecute)
throws IOException, InterruptedException {
moveCube(srcCfg, dstCfg, cubeName, projectName,
Boolean.parseBoolean(copyAcl),
@@ -142,7 +136,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName,
String projectName, String copyAcl,
- String purgeAndDisable, String overwriteIfExists, String
realExecute, String migrateSegment)
+ String purgeAndDisable, String overwriteIfExists,
String realExecute, String migrateSegment)
throws IOException, InterruptedException {
moveCube(KylinConfig.createInstanceFromUri(srcCfgUri),
KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
@@ -152,7 +146,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String
cubeName, String projectName, boolean copyAcl,
- boolean purgeAndDisable, boolean overwriteIfExists, boolean
realExecute, boolean migrateSegment)
+ boolean purgeAndDisable, boolean overwriteIfExists,
boolean realExecute, boolean migrateSegment)
throws IOException, InterruptedException {
doAclCopy = copyAcl;
doOverwrite = overwriteIfExists;
@@ -162,26 +156,24 @@ public class CubeMigrationCLI extends AbstractApplication
{
dstConfig = dstCfg;
dstStore = ResourceStore.getStore(dstConfig);
dstProject = projectName;
+ conf = HadoopUtil.getCurrentConfiguration();
CubeManager cubeManager = CubeManager.getInstance(srcConfig);
CubeInstance cube = cubeManager.getCube(cubeName);
+ srcHdfsWorkDir = srcConfig.getHdfsWorkingDirectory(cube.getProject());
+ dstHdfsWorkDir = dstConfig.getHdfsWorkingDirectory(dstProject);
logger.info("cube to be moved is : " + cubeName);
if (migrateSegment) {
checkCubeState(cube);
}
- checkAndGetHbaseUrl();
+ checkAndGetMetadataUrl();
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- hbaseAdmin = new HBaseAdmin(conf);
- hdfsFS = HadoopUtil.getWorkingFileSystem();
+ hdfsFs = HadoopUtil.getWorkingFileSystem();
operations = new ArrayList<Opt>();
copyFilesInMetaStore(cube);
- if (migrateSegment) {
- renameFoldersInHdfs(cube);
- changeHtableHost(cube);
- } else {
+ if (!migrateSegment) {
clearSegments(cubeName); // this should be after
copyFilesInMetaStore
}
addCubeAndModelIntoProject(cube, cubeName);
@@ -192,20 +184,12 @@ public class CubeMigrationCLI extends AbstractApplication
{
if (realExecute) {
doOpts();
- if (migrateSegment) {
- checkMigrationSuccess(dstConfig, cubeName, true);
- }
updateMeta(dstConfig, projectName, cubeName, cube.getModel());
} else {
showOpts();
}
}
- public void checkMigrationSuccess(KylinConfig kylinConfig, String
cubeName, Boolean ifFix) throws IOException {
- CubeMigrationCheckCLI checkCLI = new
CubeMigrationCheckCLI(kylinConfig, ifFix);
- checkCLI.execute(cubeName);
- }
-
protected void checkCubeState(CubeInstance cube) {
if (cube.getStatus() != RealizationStatusEnum.READY)
throw new IllegalStateException("Cannot migrate cube that is not
in READY state.");
@@ -217,40 +201,16 @@ public class CubeMigrationCLI extends AbstractApplication
{
}
}
- protected void checkAndGetHbaseUrl() {
+ protected void checkAndGetMetadataUrl() {
StorageURL srcMetadataUrl = srcConfig.getMetadataUrl();
StorageURL dstMetadataUrl = dstConfig.getMetadataUrl();
logger.info("src metadata url is " + srcMetadataUrl);
logger.info("dst metadata url is " + dstMetadataUrl);
-
- if (!"hbase".equals(srcMetadataUrl.getScheme()) ||
!"hbase".equals(dstMetadataUrl.getScheme()))
- throw new IllegalStateException("Both metadata urls should be
hbase metadata url");
- }
-
- protected void renameFoldersInHdfs(CubeInstance cube) throws IOException {
- for (CubeSegment segment : cube.getSegments()) {
-
- String jobUuid = segment.getLastBuildJobID();
- String src =
JobBuilderSupport.getJobWorkingDir(srcConfig.getHdfsWorkingDirectory(),
jobUuid);
- String tgt =
JobBuilderSupport.getJobWorkingDir(dstConfig.getHdfsWorkingDirectory(),
jobUuid);
-
- operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[]
{ src, tgt }));
- }
-
- }
-
- protected void changeHtableHost(CubeInstance cube) {
- if (cube.getDescriptor().getStorageType() !=
IStorageAware.ID_SHARDED_HBASE)
- return;
- for (CubeSegment segment : cube.getSegments()) {
- operations
- .add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] {
segment.getStorageLocationIdentifier() }));
- }
}
protected void clearSegments(String cubeName) throws IOException {
- operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[] { cubeName
}));
+ operations.add(new Opt(OptType.CLEAR_SEGMENTS, new
Object[]{cubeName}));
}
protected void copyFilesInMetaStore(CubeInstance cube) throws IOException {
@@ -260,16 +220,22 @@ public class CubeMigrationCLI extends AbstractApplication
{
+ " already exists on target metadata store. Use
overwriteIfExists to overwrite it");
List<String> metaItems = new ArrayList<String>();
+ List<String> srcParquetFiles = new ArrayList<String>();
+ List<String> dstParquetFiles = new ArrayList<String>();
Set<String> dictAndSnapshot = new HashSet<String>();
- listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+ listCubeRelatedResources(cube, metaItems, dictAndSnapshot,
srcParquetFiles, dstParquetFiles);
for (String item : metaItems) {
- operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] {
item }));
+ operations.add(new Opt(OptType.COPY_FILE_IN_META, new
Object[]{item}));
}
if (doMigrateSegment) {
for (String item : dictAndSnapshot) {
- operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new
Object[] { item, cube.getName() }));
+ operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new
Object[]{item, cube.getName()}));
+ }
+
+ for (int i = 0; i < srcParquetFiles.size(); i++) {
+ operations.add(new Opt(OptType.COPY_PARQUET_FILE, new
Object[]{srcParquetFiles.get(i), dstParquetFiles.get(i)}));
}
}
}
@@ -279,11 +245,11 @@ public class CubeMigrationCLI extends AbstractApplication
{
if (!dstStore.exists(projectResPath))
throw new IllegalStateException("The target project " + dstProject
+ " does not exist");
- operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] {
srcCube, cubeName, dstProject }));
+ operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[]{srcCube,
cubeName, dstProject}));
}
private void purgeAndDisable(String cubeName) throws IOException {
- operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] {
cubeName }));
+ operations.add(new Opt(OptType.PURGE_AND_DISABLE, new
Object[]{cubeName}));
}
private List<String> getCompatibleTablePath(Set<TableRef> tableRefs,
String project, String rootPath)
@@ -311,7 +277,7 @@ public class CubeMigrationCLI extends AbstractApplication {
return toResource;
}
- protected void listCubeRelatedResources(CubeInstance cube, List<String>
metaResource, Set<String> dictAndSnapshot)
+ protected void listCubeRelatedResources(CubeInstance cube, List<String>
metaResource, Set<String> dictAndSnapshot, List<String> srcParquetFiles,
List<String> dstParquetFiles)
throws IOException {
CubeDesc cubeDesc = cube.getDescriptor();
@@ -326,10 +292,25 @@ public class CubeMigrationCLI extends AbstractApplication
{
metaResource.addAll(getCompatibleTablePath(tblRefs, prj,
ResourceStore.TABLE_EXD_RESOURCE_ROOT));
if (doMigrateSegment) {
+ for (DictionaryDesc dictionaryDesc : cubeDesc.getDictionaries()) {
+ String[] columnInfo =
dictionaryDesc.getColumnRef().getColumnWithTable().split("\\.");
+ String globalDictPath;
+ if (columnInfo.length == 3) {
+ globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX +
columnInfo[1] + File.separator + columnInfo[2];
+ } else {
+ globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX +
columnInfo[0] + File.separator + columnInfo[1];
+ }
+ if (globalDictPath != null) {
+ logger.info("Add " + globalDictPath + " to migrate dict
list");
+ dictAndSnapshot.add(globalDictPath);
+ }
+ }
for (CubeSegment segment : cube.getSegments()) {
metaResource.add(segment.getStatisticsResourcePath());
dictAndSnapshot.addAll(segment.getSnapshotPaths());
- dictAndSnapshot.addAll(segment.getDictionaryPaths());
+
srcParquetFiles.add(PathManager.getSegmentParquetStoragePath(srcHdfsWorkDir,
cube.getName(), segment));
+
dstParquetFiles.add(PathManager.getSegmentParquetStoragePath(dstHdfsWorkDir,
cube.getName(), segment));
+ logger.info("Add " +
PathManager.getSegmentParquetStoragePath(cube, segment.getName(),
segment.getStorageLocationIdentifier()) + " to migrate parquet file list");
}
}
@@ -337,11 +318,6 @@ public class CubeMigrationCLI extends AbstractApplication {
metaResource.add(ACL_PREFIX + cube.getUuid());
metaResource.add(ACL_PREFIX + cube.getModel().getUuid());
}
-
-// if (cubeDesc.isStreamingCube()) {
-// // add streaming source config info for streaming cube
-//
metaResource.add(StreamingSourceConfig.concatResourcePath(cubeDesc.getModel().getRootFactTableName()));
-// }
}
@Override
@@ -355,7 +331,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
protected enum OptType {
- COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS,
ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE, CLEAR_SEGMENTS
+ COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, COPY_PARQUET_FILE,
ADD_INTO_PROJECT, PURGE_AND_DISABLE, CLEAR_SEGMENTS
}
protected void addOpt(OptType type, Object[] params) {
@@ -420,161 +396,94 @@ public class CubeMigrationCLI extends
AbstractApplication {
logger.info("Executing operation: " + opt.toString());
switch (opt.type) {
- case CHANGE_HTABLE_HOST: {
- String tableName = (String) opt.params[0];
- System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName);
- HTableDescriptor desc =
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
- desc.setValue(IRealizationConstants.HTableTag,
dstConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
- logger.info("CHANGE_HTABLE_HOST is completed");
- break;
- }
- case COPY_FILE_IN_META: {
- String item = (String) opt.params[0];
- RawResource res = srcStore.getResource(item);
- if (res == null) {
- logger.info("Item: {} doesn't exist, ignore it.", item);
+ case COPY_FILE_IN_META: {
+ String item = (String) opt.params[0];
+ RawResource res = srcStore.getResource(item);
+ if (res == null) {
+ logger.info("Item: {} doesn't exist, ignore it.", item);
+ break;
+ }
+ dstStore.putResource(renameTableWithinProject(item),
res.content(), res.lastModified());
+ res.content().close();
+ logger.info("Item " + item + " is copied");
break;
}
- dstStore.putResource(renameTableWithinProject(item),
res.content(), res.lastModified());
- res.content().close();
- logger.info("Item " + item + " is copied");
- break;
- }
- case COPY_DICT_OR_SNAPSHOT: {
- String item = (String) opt.params[0];
-
- if (item.toLowerCase(Locale.ROOT).endsWith(".dict")) {
- DictionaryManager dstDictMgr =
DictionaryManager.getInstance(dstConfig);
- DictionaryManager srcDicMgr =
DictionaryManager.getInstance(srcConfig);
- DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
-
- long ts = dictSrc.getLastModified();
- dictSrc.setLastModified(0);//to avoid resource store write
conflict
- Dictionary dictObj =
dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig);
- DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj,
dictSrc);
- dictSrc.setLastModified(ts);
-
- if (dictSaved == dictSrc) {
- //no dup found, already saved to dest
- logger.info("Item " + item + " is copied");
+ case COPY_DICT_OR_SNAPSHOT: {
+ String item = (String) opt.params[0];
+ String itemPath = item.substring(item.substring(0,
item.indexOf("/")).length()+1);
+ Path srcPath = new Path(srcHdfsWorkDir + itemPath);
+ Path dstPath = new Path(dstHdfsWorkDir + itemPath);
+ if (hdfsFs.exists(srcPath)) {
+ FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false,
true, conf);
+ logger.info("Copy " + srcPath + " to " + dstPath);
} else {
- //dictSrc is rejected because of duplication
- //modify cube's dictionary path
- String cubeName = (String) opt.params[1];
- String cubeResPath =
CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new
JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = dstStore.getResource(cubeResPath,
cubeSerializer);
- for (CubeSegment segment : cube.getSegments()) {
- for (Map.Entry<String, String> entry :
segment.getDictionaries().entrySet()) {
- if (entry.getValue().equalsIgnoreCase(item)) {
- entry.setValue(dictSaved.getResourcePath());
- }
- }
- }
- dstStore.checkAndPutResource(cubeResPath, cube,
cubeSerializer);
- logger.info("Item " + item + " is dup, instead " +
dictSaved.getResourcePath() + " is reused");
+ logger.info("Dict or snapshot " + srcPath + " is not
exists, ignore it");
}
-
- } else if (item.toLowerCase(Locale.ROOT).endsWith(".snapshot")) {
- SnapshotManager dstSnapMgr =
SnapshotManager.getInstance(dstConfig);
- SnapshotManager srcSnapMgr =
SnapshotManager.getInstance(srcConfig);
- SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
-
- long ts = snapSrc.getLastModified();
- snapSrc.setLastModified(0);
- SnapshotTable snapSaved =
dstSnapMgr.trySaveNewSnapshot(snapSrc);
- snapSrc.setLastModified(ts);
-
- if (snapSaved == snapSrc) {
- //no dup found, already saved to dest
- logger.info("Item " + item + " is copied");
-
+ break;
+ }
+ case COPY_PARQUET_FILE: {
+ Path srcPath = new Path((String) opt.params[0]);
+ Path dstPath = new Path((String) opt.params[1]);
+ if (hdfsFs.exists(srcPath)) {
+ FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false,
true, conf);
+ logger.info("Copy " + srcPath + " to " + dstPath);
} else {
- String cubeName = (String) opt.params[1];
- String cubeResPath =
CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new
JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = dstStore.getResource(cubeResPath,
cubeSerializer);
- for (CubeSegment segment : cube.getSegments()) {
- for (Map.Entry<String, String> entry :
segment.getSnapshots().entrySet()) {
- if (entry.getValue().equalsIgnoreCase(item)) {
- entry.setValue(snapSaved.getResourcePath());
- }
- }
- }
- dstStore.checkAndPutResource(cubeResPath, cube,
cubeSerializer);
- logger.info("Item " + item + " is dup, instead " +
snapSaved.getResourcePath() + " is reused");
-
+ logger.info("Parquet file " + srcPath + " is not exists,
ignore it");
}
-
- } else {
- logger.error("unknown item found: " + item);
- logger.info("ignore it");
+ break;
}
+ case ADD_INTO_PROJECT: {
+ CubeInstance srcCube = (CubeInstance) opt.params[0];
+ String cubeName = (String) opt.params[1];
+ String projectName = (String) opt.params[2];
+ String modelName = srcCube.getDescriptor().getModelName();
+
+ String projectResPath =
ProjectInstance.concatResourcePath(projectName);
+ Serializer<ProjectInstance> projectSerializer = new
JsonSerializer<ProjectInstance>(ProjectInstance.class);
+ ProjectInstance project = dstStore.getResource(projectResPath,
projectSerializer);
+
+ for (TableRef tableRef : srcCube.getModel().getAllTables()) {
+ project.addTable(tableRef.getTableIdentity());
+ }
- break;
- }
- case RENAME_FOLDER_IN_HDFS: {
- String srcPath = (String) opt.params[0];
- String dstPath = (String) opt.params[1];
- renameHDFSPath(srcPath, dstPath);
- logger.info("HDFS Folder renamed from " + srcPath + " to " +
dstPath);
- break;
- }
- case ADD_INTO_PROJECT: {
- CubeInstance srcCube = (CubeInstance) opt.params[0];
- String cubeName = (String) opt.params[1];
- String projectName = (String) opt.params[2];
- String modelName = srcCube.getDescriptor().getModelName();
-
- String projectResPath =
ProjectInstance.concatResourcePath(projectName);
- Serializer<ProjectInstance> projectSerializer = new
JsonSerializer<ProjectInstance>(ProjectInstance.class);
- ProjectInstance project = dstStore.getResource(projectResPath,
projectSerializer);
-
- for (TableRef tableRef : srcCube.getModel().getAllTables()) {
- project.addTable(tableRef.getTableIdentity());
- }
+ if (!project.getModels().contains(modelName))
+ project.addModel(modelName);
+ project.removeRealization(RealizationType.CUBE, cubeName);
+ project.addRealizationEntry(RealizationType.CUBE, cubeName);
- if (!project.getModels().contains(modelName))
- project.addModel(modelName);
- project.removeRealization(RealizationType.CUBE, cubeName);
- project.addRealizationEntry(RealizationType.CUBE, cubeName);
+ dstStore.checkAndPutResource(projectResPath, project,
projectSerializer);
+ logger.info("Project instance for " + projectName + " is
corrected");
+ break;
+ }
+ case CLEAR_SEGMENTS: {
+ String cubeName = (String) opt.params[0];
+ String cubeInstancePath =
CubeInstance.concatResourcePath(cubeName);
+ Serializer<CubeInstance> cubeInstanceSerializer = new
JsonSerializer<CubeInstance>(CubeInstance.class);
+ CubeInstance cubeInstance =
dstStore.getResource(cubeInstancePath, cubeInstanceSerializer);
+ cubeInstance.getSegments().clear();
+ cubeInstance.clearCuboids();
+ cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
+ cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
+ dstStore.checkAndPutResource(cubeInstancePath, cubeInstance,
cubeInstanceSerializer);
+ logger.info("Cleared segments for " + cubeName + ", since
segments has not been copied");
+ break;
+ }
+ case PURGE_AND_DISABLE: {
+ String cubeName = (String) opt.params[0];
+ String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+ Serializer<CubeInstance> cubeSerializer = new
JsonSerializer<CubeInstance>(CubeInstance.class);
+ CubeInstance cube = srcStore.getResource(cubeResPath,
cubeSerializer);
+ cube.getSegments().clear();
+ cube.setStatus(RealizationStatusEnum.DISABLED);
+ srcStore.checkAndPutResource(cubeResPath, cube,
cubeSerializer);
+ logger.info("Cube " + cubeName + " is purged and disabled in "
+ srcConfig.getMetadataUrl());
- dstStore.checkAndPutResource(projectResPath, project,
projectSerializer);
- logger.info("Project instance for " + projectName + " is
corrected");
- break;
- }
- case CLEAR_SEGMENTS: {
- String cubeName = (String) opt.params[0];
- String cubeInstancePath =
CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeInstanceSerializer = new
JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath,
cubeInstanceSerializer);
- cubeInstance.getSegments().clear();
- cubeInstance.clearCuboids();
- cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
- cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
- dstStore.checkAndPutResource(cubeInstancePath, cubeInstance,
cubeInstanceSerializer);
- logger.info("Cleared segments for " + cubeName + ", since segments
has not been copied");
- break;
- }
- case PURGE_AND_DISABLE: {
- String cubeName = (String) opt.params[0];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new
JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = srcStore.getResource(cubeResPath,
cubeSerializer);
- cube.getSegments().clear();
- cube.setStatus(RealizationStatusEnum.DISABLED);
- srcStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
- logger.info("Cube " + cubeName + " is purged and disabled in " +
srcConfig.getMetadataUrl());
-
- break;
- }
- default: {
- //do nothing
- break;
- }
+ break;
+ }
+ default: {
+ //do nothing
+ break;
+ }
}
}
@@ -582,53 +491,38 @@ public class CubeMigrationCLI extends AbstractApplication
{
logger.info("Undo operation: " + opt.toString());
switch (opt.type) {
- case CHANGE_HTABLE_HOST: {
- String tableName = (String) opt.params[0];
- HTableDescriptor desc =
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
- desc.setValue(IRealizationConstants.HTableTag,
srcConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
- break;
- }
- case COPY_FILE_IN_META: {
- // no harm
- logger.info("Undo for COPY_FILE_IN_META is ignored");
- String item = (String) opt.params[0];
-
- if (item.startsWith(ACL_PREFIX) && doAclCopy) {
- logger.info("Remove acl record");
- dstStore.deleteResource(item);
+ case COPY_FILE_IN_META: {
+ // no harm
+ logger.info("Undo for COPY_FILE_IN_META is ignored");
+ String item = (String) opt.params[0];
+
+ if (item.startsWith(ACL_PREFIX) && doAclCopy) {
+ logger.info("Remove acl record");
+ dstStore.deleteResource(item);
+ }
+ break;
}
- break;
- }
- case COPY_DICT_OR_SNAPSHOT: {
- // no harm
- logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
- break;
- }
- case RENAME_FOLDER_IN_HDFS: {
- String srcPath = (String) opt.params[1];
- String dstPath = (String) opt.params[0];
-
- if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new
Path(dstPath))) {
- renameHDFSPath(srcPath, dstPath);
- logger.info("HDFS Folder renamed from " + srcPath + " to " +
dstPath);
+ case COPY_DICT_OR_SNAPSHOT: {
+ // no harm
+ logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
+ break;
+ }
+ case ADD_INTO_PROJECT: {
+ logger.info("Undo for ADD_INTO_PROJECT is ignored");
+ break;
+ }
+ case PURGE_AND_DISABLE: {
+ logger.info("Undo for PURGE_AND_DISABLE is not supported");
+ break;
+ }
+ case COPY_PARQUET_FILE: {
+ logger.info("Undo for COPY_PARQUET_FILE is ignored");
+ break;
+ }
+ default: {
+ //do nothing
+ break;
}
- break;
- }
- case ADD_INTO_PROJECT: {
- logger.info("Undo for ADD_INTO_PROJECT is ignored");
- break;
- }
- case PURGE_AND_DISABLE: {
- logger.info("Undo for PURGE_AND_DISABLE is not supported");
- break;
- }
- default: {
- //do nothing
- break;
- }
}
}
@@ -660,17 +554,4 @@ public class CubeMigrationCLI extends AbstractApplication {
}
}
}
-
- private void renameHDFSPath(String srcPath, String dstPath) throws
IOException, InterruptedException {
- int nRetry = 0;
- int sleepTime = 5000;
- while (!hdfsFS.rename(new Path(srcPath), new Path(dstPath))) {
- ++nRetry;
- if (nRetry > 3) {
- throw new InterruptedException("Cannot rename folder " +
srcPath + " to folder " + dstPath);
- } else {
- Thread.sleep((long) sleepTime * nRetry * nRetry);
- }
- }
- }
-}
+}
\ No newline at end of file