This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 5be135c [KYLIN-4940] Implement the step of "Extract Dictionary from
Global Dictionary" for spark cubing engine
5be135c is described below
commit 5be135c61c80b9241ac7632dec41cc7209b9d23a
Author: yangjiang <[email protected]>
AuthorDate: Tue Jan 19 11:34:35 2021 +0800
[KYLIN-4940] Implement the step of "Extract Dictionary from Global
Dictionary" for spark cubing engine
---
.../engine/spark/SparkBatchCubingJobBuilder2.java | 4 +
.../kylin/engine/spark/SparkCubingByLayer.java | 174 ++++++++++++++++++++-
.../org/apache/kylin/engine/spark/SparkUtil.java | 40 +++++
3 files changed, 211 insertions(+), 7 deletions(-)
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 7d6a367..4b43318 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -201,6 +201,10 @@ public class SparkBatchCubingJobBuilder2 extends
JobBuilderSupport {
sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
getSegmentMetadataUrl(seg.getConfig(), jobId));
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(),
cuboidRootPath);
+ if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SHRUNK_INPUT_PATH.getOpt(),
+ getShrunkenDictionaryPath(jobId));
+ }
sparkExecutable.setJobId(jobId);
StringBuilder jars = new StringBuilder();
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cbedc8b..09c68c9 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,21 +17,28 @@
*/
package org.apache.kylin.engine.spark;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeDescManager;
@@ -44,6 +51,8 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.ShrunkenDictionary;
+import org.apache.kylin.dict.ShrunkenDictionaryBuilder;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.IMROutput2;
@@ -59,13 +68,19 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +106,9 @@ public class SparkCubingByLayer extends AbstractApplication
implements Serializa
.withDescription("Hive Intermediate Table").create("hiveTable");
public static final Option OPTION_INPUT_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table
PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_SHRUNK_INPUT_PATH = OptionBuilder
+
.withArgName(BatchConstants.ARG_SHRUNKEN_DICT_PATH).hasArg().isRequired(false)
+ .withDescription("shrunken Dictionary
Path").create(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
private Options options;
@@ -102,6 +120,7 @@ public class SparkCubingByLayer extends AbstractApplication
implements Serializa
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_META_URL);
options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_SHRUNK_INPUT_PATH);
}
@Override
@@ -117,6 +136,8 @@ public class SparkCubingByLayer extends AbstractApplication
implements Serializa
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String shrunkInputPath =
optionsHelper.getOptionValue(OPTION_SHRUNK_INPUT_PATH);
+ logger.info("shrunkInputPath is {}", shrunkInputPath);
Class[] kryoClassArray = new Class[] {
Class.forName("scala.reflect.ClassTag$$anon$1") };
@@ -130,7 +151,12 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
JavaSparkContext sc = new JavaSparkContext(conf);
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
- SparkUtil.modifySparkHadoopConfiguration(sc.sc(),
AbstractHadoopJob.loadKylinConfigFromHdfs(new
SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set
dfs.replication and enable compress
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob
+ .loadKylinConfigFromHdfs(new
SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set
dfs.replication and enable compress
+
+ if (shrunkInputPath != null)
+
sc.hadoopConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH,
shrunkInputPath);
+
final SerializableConfiguration sConf = new
SerializableConfiguration(sc.hadoopConfiguration());
KylinConfig envConfig =
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
@@ -164,9 +190,21 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
boolean isSequenceFile =
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
- final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = SparkUtil
- .hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
- .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl,
sConf));
+ final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
+ logger.info("isShrunkenDictFromGlobalEnabled {} shrunkInputPath is
{}",
+ cubeDesc.isShrunkenDictFromGlobalEnabled(), shrunkInputPath);
+
+ JavaRDD<String[]> recordInputRDD = null;
+
+ if (cubeDesc.isShrunkenDictFromGlobalEnabled() && shrunkInputPath !=
null) {
+ recordInputRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc,
inputPath, hiveTable).cache();
+ recordInputRDD
+ .foreachPartition(new CreateShrunkenDictionary(cubeName,
cubeDesc, cubeSegment, envConfig, sConf));
+ encodedBaseRDD = recordInputRDD.mapToPair(new
EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+ } else {
+ encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc,
inputPath, hiveTable)
+ .mapToPair(new EncodeBaseCuboid(cubeName, segmentId,
metaUrl, sConf));
+ }
Long totalCount = 0L;
if (envConfig.isSparkSanityCheckEnabled()) {
@@ -190,6 +228,11 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0,
job, envConfig);
+ // use ShrunkenDictionary should unpersist recordInputRDD
+ if (recordInputRDD != null) {
+ recordInputRDD.unpersist();
+ }
+
PairFlatMapFunction flatMapFunction = new CuboidFlatMap(cubeName,
segmentId, metaUrl, sConf);
// aggregate to ND cuboids
for (level = 1; level <= totalLevels; level++) {
@@ -286,9 +329,16 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
long baseCuboidId =
Cuboid.getBaseCuboidId(cubeDesc);
Cuboid baseCuboid =
Cuboid.findForMandatory(cubeDesc, baseCuboidId);
- baseCuboidBuilder = new BaseCuboidBuilder(kConfig,
cubeDesc, cubeSegment, interDesc,
-
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
-
MeasureIngester.create(cubeDesc.getMeasures()),
cubeSegment.buildDictionaryMap());
+ String splitKey =
String.valueOf(TaskContext.getPartitionId());
+ try {
+ baseCuboidBuilder = new
BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc,
+
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+
MeasureIngester.create(cubeDesc.getMeasures()),
+
SparkUtil.getDictionaryMap(cubeSegment, splitKey, conf.get()));
+ } catch (IOException e) {
+ logger.error("Fail to get shrunk dict");
+ e.printStackTrace();
+ }
initialized = true;
}
}
@@ -466,4 +516,114 @@ public class SparkCubingByLayer extends
AbstractApplication implements Serializa
return count;
}
+ public static class CreateShrunkenDictionary implements
VoidFunction<Iterator<String[]>> {
+ private String cubeName;
+ private CubeDesc cubeDesc;
+ private CubeSegment cubeSeg;
+
+ private KylinConfig kylinConfig;
+ private SerializableConfiguration scof;
+ private CubeJoinedFlatTableEnrich intermediateTableDesc;
+
+ private List<TblColRef> globalColumns;
+ private int[] globalColumnIndex;
+ private List<Set<String>> globalColumnValues;
+
+ private volatile transient boolean initialized = false;
+
+ private String splitKey;
+
+ public CreateShrunkenDictionary(String cubeName, CubeDesc cubeDesc,
CubeSegment cubeSegment, KylinConfig kylinConfig,
+ SerializableConfiguration
serializableConfiguration) {
+ this.cubeName = cubeName;
+ this.cubeDesc = cubeDesc;
+ this.cubeSeg = cubeSegment;
+ this.kylinConfig = kylinConfig;
+ this.scof = serializableConfiguration;
+ this.intermediateTableDesc = new
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),
+ cubeDesc);
+ }
+
+ public void init() {
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset =
KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ globalColumns = cubeDesc.getAllGlobalDictColumnsNeedBuilt();
+ globalColumnIndex = new int[globalColumns.size()];
+ globalColumnValues =
Lists.newArrayListWithExpectedSize(globalColumns.size());
+
+ splitKey = String.valueOf(TaskContext.getPartitionId());
+
+ for (int i = 0; i < globalColumns.size(); i++) {
+ TblColRef colRef = globalColumns.get(i);
+ int columnIndexOnFlatTbl =
intermediateTableDesc.getColumnIndex(colRef);
+ globalColumnIndex[i] = columnIndexOnFlatTbl;
+ globalColumnValues.add(Sets.<String>newHashSet());
+ }
+ }
+ }
+
+ @Override
+ public void call(Iterator<String[]> iter) throws Exception {
+
+ if (initialized == false) {
+ synchronized (CreateShrunkenDictionary.class) {
+ if (initialized == false) {
+ init();
+ initialized = true;
+ }
+ }
+ }
+ int count = 0;
+ while (iter.hasNext()) {
+ count++;
+ String[] rowArray = iter.next();
+ for (int i = 0; i < globalColumnIndex.length; i++) {
+ String fieldValue = rowArray[globalColumnIndex[i]];
+ if (fieldValue == null)
+ continue;
+ globalColumnValues.get(i).add(fieldValue);
+ }
+ }
+
+ FileSystem fs = FileSystem.get(scof.get());
+ Path outputDirBase = new
Path(scof.get().get(BatchConstants.ARG_SHRUNKEN_DICT_PATH));
+
+ Map<TblColRef, Dictionary<String>> globalDictionaryMap = cubeSeg
+ .buildGlobalDictionaryMap(globalColumns.size());
+
+ ShrunkenDictionary.StringValueSerializer strValueSerializer = new
ShrunkenDictionary.StringValueSerializer();
+
+ for (int i = 0; i < globalColumns.size(); i++) {
+ List<String> colDistinctValues =
Lists.newArrayList(globalColumnValues.get(i));
+ if (colDistinctValues.size() == 0) {
+ continue;
+ }
+ // sort values to accelerate the encoding process by reducing
the swapping of global dictionary slices
+ Collections.sort(colDistinctValues);
+
+ //only get one col dict
+ ShrunkenDictionaryBuilder<String> dictBuilder = new
ShrunkenDictionaryBuilder<>(
+ globalDictionaryMap.get(globalColumns.get(i)));
+
+ for (String colValue : colDistinctValues) {
+ dictBuilder.addValue(colValue);
+ }
+
+ Dictionary<String> shrunkenDict =
dictBuilder.build(strValueSerializer);
+
+ Path colDictDir = new Path(outputDirBase,
globalColumns.get(i).getIdentity());
+
+ if (!fs.exists(colDictDir)) {
+ fs.mkdirs(colDictDir);
+ }
+ Path shrunkenDictPath = new Path(colDictDir, splitKey);
+ try (DataOutputStream dos = fs.create(shrunkenDictPath)) {
+ logger.info("Write Shrunken dictionary to {} success",
shrunkenDictPath);
+ shrunkenDict.write(dos);
+ }
+ }
+
+ }
+ }
+
}
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index b963252..d146c85 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -18,9 +18,12 @@
package org.apache.kylin.engine.spark;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,14 +33,17 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.ShrunkenDictionary;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.SparkContext;
@@ -53,9 +59,13 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SparkUtil {
+ private static final Logger logger =
LoggerFactory.getLogger(SparkUtil.class);
+
public static ISparkBatchCubingInputSide
getBatchCubingInputSide(CubeSegment seg) {
IJoinedFlatTableDesc flatDesc =
EngineFactory.getJoinedFlatTableDesc(seg);
return
(ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg,
ISparkInput.class).getBatchCubingInputSide(flatDesc);
@@ -188,4 +198,34 @@ public class SparkUtil {
});
}
+ public static Map<TblColRef, Dictionary<String>>
getDictionaryMap(CubeSegment cubeSegment, String splitKey,
+
Configuration configuration) throws IOException {
+ Map<TblColRef, Dictionary<String>> dictionaryMap =
cubeSegment.buildDictionaryMap();
+
+ String shrunkenDictPath =
configuration.get(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
+ if (shrunkenDictPath == null) {
+ return dictionaryMap;
+ }
+
+ // replace global dictionary with shrunken dictionary if possible
+ FileSystem fs = FileSystem.get(configuration);
+ ShrunkenDictionary.StringValueSerializer valueSerializer = new
ShrunkenDictionary.StringValueSerializer();
+ for (TblColRef colRef :
cubeSegment.getCubeDesc().getAllGlobalDictColumnsNeedBuilt()) {
+ Path colShrunkenDictDir = new Path(shrunkenDictPath,
colRef.getIdentity());
+ Path colShrunkenDictPath = new Path(colShrunkenDictDir, splitKey);
+ if (!fs.exists(colShrunkenDictPath)) {
+ logger.warn("Shrunken dictionary for column " +
colRef.getIdentity() + " in split " + splitKey
+ + " does not exist!!!");
+ continue;
+ }
+ try (DataInputStream dis = fs.open(colShrunkenDictPath)) {
+ Dictionary<String> shrunkenDict = new
ShrunkenDictionary(valueSerializer);
+ shrunkenDict.readFields(dis);
+ logger.info("Read Shrunken dictionary from {} success",
colShrunkenDictPath);
+ dictionaryMap.put(colRef, shrunkenDict);
+ }
+ }
+
+ return dictionaryMap;
+ }
}