You are using lots of tiny executors (128 executor with only 2G memory), could you try with bigger executor (for example 16G x 16)?
On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen <bteeu...@gmail.com> wrote: > > So I wrote some code to reproduce the problem. > > I assume here that a pipeline should be able to transform a categorical > feature with a few million levels. > So I create a dataframe with the categorical feature (‘id’), apply a > StringIndexer and OneHotEncoder transformer, and run a loop where I increase > the amount of levels. > It breaks at 1.276.000 levels. > > Shall I report this as a ticket in JIRA? > > ____________ > > > from pyspark.sql.functions import rand > from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler > from pyspark.ml import Pipeline > > start_id = 100000 > n = 5000000 > step = (n - start_id) / 25 > > for i in xrange(start_id,start_id + n,step): > print "#########\n {}".format(i) > dfr = (sqlContext > .range(start_id, start_id + i) > .withColumn(‘label', rand(seed=10)) > .withColumn('feat2', rand(seed=101)) > # .withColumn('normal', randn(seed=27)) > ).repartition(32).cache() > # dfr.select("id", rand(seed=10).alias("uniform"), > randn(seed=27).alias("normal")).show() > dfr.show(1) > print "This dataframe has {0} rows (and therefore {0} levels will be one > hot encoded)".format(dfr.count()) > > categorical_feature = ['id'] > stages = [] > > for c in categorical_feature: > stages.append(StringIndexer(inputCol=c, > outputCol="{}Index".format(c))) > stages.append(OneHotEncoder(dropLast= False, inputCol = > "{}Index".format(c), outputCol = "{}OHE".format(c))) > > columns = ["{}OHE".format(x) for x in categorical_feature] > columns.append('feat2') > > assembler = VectorAssembler( > inputCols=columns, > outputCol="features") > stages.append(assembler) > > df2 = dfr > > pipeline = Pipeline(stages=stages) > pipeline_fitted = pipeline.fit(df2) > df3 = pipeline_fitted.transform(df2) > df3.show(1) > dfr.unpersist() > > > ____________ > > Output: > > > ######### > 100000 > +------+---------------------------+-------------------+ > | id|label | feat2| > +------+---------------------------+-------------------+ > |183601| 0.38693226548356197|0.04485291680169634| > +------+---------------------------+-------------------+ > only showing top 1 row > > This dataframe has 100000 rows (and therefore 100000 levels will be one hot > encoded) > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > | id|label | feat2|idIndex| > idOHE| features| > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > |183601| > 0.38693226548356197|0.04485291680169634|83240.0|(100000,[83240],[...|(100001,[83240,10...| > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > only showing top 1 row > > ######### > 296000 > +------+---------------------------+-------------------+ > | id|label | feat2| > +------+---------------------------+-------------------+ > |137008| 0.2996020619810592|0.38693226548356197| > +------+---------------------------+-------------------+ > only showing top 1 row > > This dataframe has 296000 rows (and therefore 296000 levels will be one hot > encoded) > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > | id|label | feat2|idIndex| > idOHE| features| > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > |137008| > 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...| > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > only showing top 1 row > > ######### > 492000 > +------+---------------------------+-------------------+ > | id|label | feat2| > +------+---------------------------+-------------------+ > |534351| 0.9450641392552516|0.23472935141246665| > +------+---------------------------+-------------------+ > only showing top 1 row > > This dataframe has 492000 rows (and therefore 492000 levels will be one hot > encoded) > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > | id|label | feat2|idIndex| > idOHE| features| > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > |534351| 0.9450641392552516|0.23472935141246665| > 3656.0|(492000,[3656],[1...|(492001,[3656,492...| > +------+---------------------------+-------------------+-------+--------------------+--------------------+ > only showing top 1 row > > ######### > 688000 > +------+---------------------------+------------------+ > | id|label | feat2| > +------+---------------------------+------------------+ > |573008| 0.3059347083549171|0.4846147657830415| > +------+---------------------------+------------------+ > only showing top 1 row > > This dataframe has 688000 rows (and therefore 688000 levels will be one hot > encoded) > +------+---------------------------+------------------+--------+--------------------+--------------------+ > | id|label | feat2| idIndex| > idOHE| features| > +------+---------------------------+------------------+--------+--------------------+--------------------+ > |573008| > 0.3059347083549171|0.4846147657830415|475855.0|(688000,[475855],...|(688001,[475855,6...| > +------+---------------------------+------------------+--------+--------------------+--------------------+ > only showing top 1 row > > ######### > 884000 > +------+---------------------------+------------------+ > | id|label | feat2| > +------+---------------------------+------------------+ > |970195| 0.34345290476989165|0.9843176058907069| > +------+---------------------------+------------------+ > only showing top 1 row > > This dataframe has 884000 rows (and therefore 884000 levels will be one hot > encoded) > +------+---------------------------+------------------+--------+--------------------+--------------------+ > | id|label | feat2| idIndex| > idOHE| features| > +------+---------------------------+------------------+--------+--------------------+--------------------+ > |970195| > 0.34345290476989165|0.9843176058907069|333915.0|(884000,[333915],...|(884001,[333915,8...| > +------+---------------------------+------------------+--------+--------------------+--------------------+ > only showing top 1 row > > ######### > 1080000 > +------+---------------------------+-----------------+ > | id|label | feat2| > +------+---------------------------+-----------------+ > |403758| 0.6333344187975314|0.774327685753309| > +------+---------------------------+-----------------+ > only showing top 1 row > > This dataframe has 1080000 rows (and therefore 1080000 levels will be one hot > encoded) > +------+---------------------------+-----------------+--------+--------------------+--------------------+ > | id|label | feat2| idIndex| > idOHE| features| > +------+---------------------------+-----------------+--------+--------------------+--------------------+ > |403758| > 0.6333344187975314|0.774327685753309|287898.0|(1080000,[287898]...|(1080001,[287898,...| > +------+---------------------------+-----------------+--------+--------------------+--------------------+ > only showing top 1 row > > ######### > 1276000 > +------+---------------------------+------------------+ > | id|label | feat2| > +------+---------------------------+------------------+ > |508726| 0.2513814327408137|0.8480577183702391| > +------+---------------------------+------------------+ > only showing top 1 row > > This dataframe has 1276000 rows (and therefore 1276000 levels will be one hot > encoded) > > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-2-f5c9fe263872> in <module>() > 38 pipeline = Pipeline(stages=stages) > 39 pipeline_fitted = pipeline.fit(df2) > ---> 40 df3 = pipeline_fitted.transform(df2) > 41 df3.show(1) > 42 dfr.unpersist() > > /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, params) > 103 return self.copy(params)._transform(dataset) > 104 else: > --> 105 return self._transform(dataset) > 106 else: > 107 raise ValueError("Params must be a param map but got %s." > % type(params)) > > /opt/spark/2.0.0/python/pyspark/ml/pipeline.py in _transform(self, dataset) > 196 def _transform(self, dataset): > 197 for t in self.stages: > --> 198 dataset = t.transform(dataset) > 199 return dataset > 200 > > /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, params) > 103 return self.copy(params)._transform(dataset) > 104 else: > --> 105 return self._transform(dataset) > 106 else: > 107 raise ValueError("Params must be a param map but got %s." > % type(params)) > > /opt/spark/2.0.0/python/pyspark/ml/wrapper.py in _transform(self, dataset) > 227 def _transform(self, dataset): > 228 self._transfer_params_to_java() > --> 229 return DataFrame(self._java_obj.transform(dataset._jdf), > dataset.sql_ctx) > 230 > 231 > > /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in > __call__(self, *args) > 931 answer = self.gateway_client.send_command(command) > 932 return_value = get_return_value( > --> 933 answer, self.gateway_client, self.target_id, self.name) > 934 > 935 for temp_arg in temp_args: > > /opt/spark/2.0.0/python/pyspark/sql/utils.py in deco(*a, **kw) > 61 def deco(*a, **kw): > 62 try: > ---> 63 return f(*a, **kw) > 64 except py4j.protocol.Py4JJavaError as e: > 65 s = e.java_exception.toString() > > /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in > get_return_value(answer, gateway_client, target_id, name) > 310 raise Py4JJavaError( > 311 "An error occurred while calling {0}{1}{2}.\n". > --> 312 format(target_id, ".", name), value) > 313 else: > 314 raise Py4JError( > > Py4JJavaError: An error occurred while calling o408.transform. > : java.lang.OutOfMemoryError: GC overhead limit exceeded > at scala.collection.immutable.Stream$.from(Stream.scala:1262) > at scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262) > at scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) > at > scala.collection.LinearSeqOptimized$class.loop$1(LinearSeqOptimized.scala:274) > at > scala.collection.LinearSeqOptimized$class.lengthCompare(LinearSeqOptimized.scala:277) > at scala.collection.immutable.Stream.lengthCompare(Stream.scala:202) > at scala.collection.SeqViewLike$Zipped$class.length(SeqViewLike.scala:133) > at scala.collection.SeqViewLike$$anon$9.length(SeqViewLike.scala:203) > at scala.collection.SeqViewLike$Mapped$class.length(SeqViewLike.scala:66) > at scala.collection.SeqViewLike$$anon$3.length(SeqViewLike.scala:197) > at scala.collection.SeqLike$class.size(SeqLike.scala:106) > at scala.collection.SeqViewLike$AbstractTransformed.size(SeqViewLike.scala:37) > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:285) > at > scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37) > at > org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72) > at > org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70) > at > org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65) > at > org.apache.spark.ml.attribute.AttributeGroup$.fromMetadata(AttributeGroup.scala:234) > at > org.apache.spark.ml.attribute.AttributeGroup$.fromStructField(AttributeGroup.scala:246) > at > org.apache.spark.ml.feature.OneHotEncoder.transform(OneHotEncoder.scala:139) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) > > > > > Spark Properties > NameValue > spark.app.namepyspark-shell > spark.driver.cores1 > spark.driver.extraJavaOptions-XX:+UnlockDiagnosticVMOptions > -XX:+PerfDisableSharedMem > spark.driver.memory2g > spark.dynamicAllocation.enabledFALSE > spark.eventLog.dirhdfs:///spark/history > spark.eventLog.enabledTRUE > spark.executor.cores1 > spark.executor.extraJavaOptions-XX:+UnlockDiagnosticVMOptions > -XX:+PerfDisableSharedMem > spark.executor.iddriver > spark.executor.instances128 > spark.executor.memory2g > spark.history.fs.logDirectoryhdfs:///spark/history > spark.masteryarn-client > spark.memory.fraction0.7 > spark.memory.storageFraction0.5 > spark.rdd.compressTRUE > spark.scheduler.modeFIFO > spark.serializer.objectStreamReset100 > spark.shuffle.service.enabledFALSE > spark.speculationTRUE > spark.submit.deployModeclient > spark.task.maxFailures10 > spark.yarn.executor.memoryOverhead2048 > spark.yarn.isPythonTRUE > > > On Aug 11, 2016, at 10:24 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > > Ok, interesting. Would be interested to see how it compares. > > By the way, the feature size you select for the hasher should be a power of 2 > (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes are > evenly distributed (see the section on HashingTF under > http://spark.apache.org/docs/latest/ml-features.html#tf-idf). > > On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com> wrote: >> >> Thanks Nick, I played around with the hashing trick. When I set numFeatures >> to the amount of distinct values for the largest sparse feature, I ended up >> with half of them colliding. When raising the numFeatures to have less >> collisions, I soon ended up with the same memory problems as before. To be >> honest, I didn’t test the impact of having more or less collisions on the >> quality of the predictions, but tunnel visioned into getting it to work with >> the full sparsity. >> >> Before I worked in RDD land; zipWithIndex on rdd with distinct values + one >> entry ‘missing’ for missing values during predict, collectAsMap, broadcast >> the map, udf generating sparse vector, assembling the vectors manually). To >> move into dataframe land, I wrote: >> >> def getMappings(mode): >> mappings = defaultdict(dict) >> max_index = 0 >> for c in cat_int[:]: # for every categorical variable >> >> logging.info("starting with {}".format(c)) >> if mode == 'train': >> grouped = (df2 >> .groupBy(c).count().orderBy('count', ascending = False) # >> get counts, ordered from largest to smallest >> .selectExpr("*", "1 as n") # prepare for window >> function summing up 1s before current row to create a RANK >> .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS >> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS >> index".format(max_index)) >> .drop('n') # drop the column with static 1 values used for >> the cumulative sum >> ) >> logging.info("Got {} rows.".format(grouped.count())) >> grouped.show() >> logging.info('getting max') >> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: >> r.t).first() # update the max index so next categorical feature starts with >> it. >> logging.info("max_index has become: {}".format(max_index)) >> logging.info('adding missing value, so we also train on this and >> prediction data missing it. ') >> schema = grouped.schema >> logging.info(schema) >> grouped = grouped.union(spark.createDataFrame([('missing', 0, >> max_index + 1)], schema)) # add index for missing value for values during >> predict that are unseen during training. >> max_index += 1 >> saveto = "{}/{}".format(path, c) >> logging.info("Writing to: {}".format(saveto)) >> grouped.write.parquet(saveto, mode = 'overwrite') >> >> elif mode == 'predict': >> loadfrom = "{}/{}".format(path, c) >> logging.info("Reading from: {}".format(loadfrom)) >> grouped = spark.read.parquet(loadfrom) >> >> logging.info("Adding to dictionary") >> mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: >> (d[c], d['index'])).collectAsMap() # build up dictionary to be broadcasted >> later on, used for creating sparse vectors >> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: >> r.t).first() >> >> logging.info("Sanity check for indexes:") >> for c in cat_int[:]: >> logging.info("{} min: {} max: {}".format(c, >> min(mappings[c].values()), max(mappings[c].values()))) # some logging to >> confirm the indexes. >> logging.info("Missing value = {}".format(mappings[c]['missing'])) >> return max_index, mappings >> >> I’d love to see the StringIndexer + OneHotEncoder transformers cope with >> missing values during prediction; for now I’ll work with the hacked stuff >> above :). >> (.. and I should compare the performance with using the hashing trick.) >> >> Ben > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org