I raised driver memory to 30G and maxresultsize to 25G, this time in pyspark.
Code run: cat_int = ['bigfeature'] stagesIndex = [] stagesOhe = [] for c in cat_int: stagesIndex.append(StringIndexer(inputCol=c, outputCol="{}Index".format(c))) stagesOhe.append(OneHotEncoder(dropLast= False, inputCol = "{}Index".format(c), outputCol = "{}OHE".format(c))) df2 = df for i in range(len(stagesIndex)): logging.info("Starting with {}".format(cat_int[i])) stagesIndex[i].fit(df2) logging.info("Fitted. Now transforming:") df2 = stagesIndex[i].fit(df2).transform(df2) logging.info("Transformed. Now showing transformed:") df2.show() logging.info("OHE") df2 = stagesOhe[i].transform(df2) logging.info("Fitted. Now showing OHE:") df2.show() Now I get error: 2016-08-04 08:53:44,839 INFO Starting with bigfeature [57/7074] ukStringIndexer_442b8e11e3294de9b83a 2016-08-04 09:06:18,147 INFO Fitted. Now transforming: 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 - Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) at scala.util.Try$.apply(Try.scala:192) at scala.util.Failure.recover(Try.scala:216) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.complete(Promise.scala:55) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205) at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [13/7074] at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds ... 8 more 16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for RPC 4858888672840406395 from /10.10.80.4:59931 (47 bytes) since it is not outstanding ^[[A^[[5~2016-08-04 09:12:07,016 INFO Transformed. Now showing transformed: 16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took 71756ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010, 10.10.10.12:50010, 10.10.91.9:50010] 16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor exception for block BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 java.io.IOException: Bad response ERROR for block BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from datanode 10.10.10.12:50010 at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897) 16/08/04 09:13:48 WARN DFSClient: Error Recovery for block BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in pipeline 10.10.66.5:50010, 10.10.10.12:50010, 10.192.91.9:50010: bad datanode 10.10.10.12:50010 Traceback (most recent call last): File "<stdin>", line 7, in <module> File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in show print(self._jdf.showString(n, truncate)) File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString. : java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) Ben > On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com> wrote: > > Hi, > > I want to one hot encode a column containing 56 million distinct values. My > dataset is 800m rows + 17 columns. > I first apply a StringIndexer, but it already breaks there giving a OOM java > heap space error. > > I launch my app on YARN with: > /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors 128 > --executor-cores 2 --driver-memory 12G --conf spark.driver.maxResultSize=8G > > After grabbing the data, I run: > > val catInts = Array(“bigfeature”) > > val stagesIndex = scala.collection.mutable.ArrayBuffer.empty[StringIndexer] > val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder] > for (c <- catInts) { > println(s"starting with $c") > val i = new StringIndexer() > .setInputCol(c) > .setOutputCol(s"${c}Index") > stagesIndex += i > > val o = new OneHotEncoder() > .setDropLast(false) > .setInputCol(s"${c}Index") > .setOutputCol(s"${c}OHE") > stagesOhe += o > } > > println(s"Applying string indexers: fitting") > val pipelined = new Pipeline().setStages(stagesIndex.toArray) > val dfFitted = pipelined.fit(df) > > > Then, the application master shows a "countByValue at StringIndexer.scala” > taking 1.8 minutes (so very fast). > Afterwards, the shell console hangs for a while. What is it doing now? After > some time, it shows: > > scala> val dfFitted = pipelined.fit(df) > > java.lang.OutOfMemoryError: Java heap space > at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141) > at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139) > at > org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159) > at > org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230) > at > org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167) > at > org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86) > at > org.apache.spark.ml.feature.StringIndexerModel.<init>(StringIndexer.scala:137) > at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93) > at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66) > at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149) > at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44) > at > scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37) > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145) > ... 16 elided > > >