[ https://issues.apache.org/jira/browse/HIVE-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15794194#comment-15794194 ]
Xuefu Zhang commented on HIVE-15527: ------------------------------------ [~csun], [~lirui], and [~kellyzly], thanks for your feedback. The patch here is more like a POC, so improvement is needed for production. Here are a few thoughts: 1) I'm not sure what caused the result diff, though there might be a bug in HiveKVResultCache that's manifested. The diff seems invalid when comparing to MR result. Also, there seems some randomness generating the diff. 2) As to performance, Rui's idea concern is valid. What I tried to demo is that we need something similar to HiveKVResultCache but only for values. 3) Similar to 2), we need to have a good cache size to avoid FIO for regular group sizes. Currently HiveKVREsultCache has cache only for 1024 rows, which seems rather small. 4) Performance impact needs to be evaluated. 5) The idea here could be used to solve the same problem for Spark's groupByKey() in Hive. We could use Spark's reduceByKey() instead and in Hive we do in-group value caching like what we can do here. I'm not sure if I have bandwidth to move this forward at full speed. Please feel free to take this (and other issues) forward. Thanks. > Memory usage is unbound in SortByShuffler for Spark > --------------------------------------------------- > > Key: HIVE-15527 > URL: https://issues.apache.org/jira/browse/HIVE-15527 > Project: Hive > Issue Type: Improvement > Components: Spark > Affects Versions: 1.1.0 > Reporter: Xuefu Zhang > Assignee: Xuefu Zhang > Attachments: HIVE-15527.1.patch, HIVE-15527.2.patch, > HIVE-15527.3.patch, HIVE-15527.patch > > > In SortByShuffler.java, an ArrayList is used to back the iterator for values > that have the same key in shuffled result produced by spark transformation > sortByKey. It's possible that memory can be exhausted because of a large key > group. > {code} > @Override > public Tuple2<HiveKey, Iterable<BytesWritable>> next() { > // TODO: implement this by accumulating rows with the same key > into a list. > // Note that this list needs to improved to prevent excessive > memory usage, but this > // can be done in later phase. > while (it.hasNext()) { > Tuple2<HiveKey, BytesWritable> pair = it.next(); > if (curKey != null && !curKey.equals(pair._1())) { > HiveKey key = curKey; > List<BytesWritable> values = curValues; > curKey = pair._1(); > curValues = new ArrayList<BytesWritable>(); > curValues.add(pair._2()); > return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, > values); > } > curKey = pair._1(); > curValues.add(pair._2()); > } > if (curKey == null) { > throw new NoSuchElementException(); > } > // if we get here, this should be the last element we have > HiveKey key = curKey; > curKey = null; > return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, > curValues); > } > {code} > Since the output from sortByKey is already sorted on key, it's possible to > backup the value iterable using the same input iterator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)