Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task.
It may also be worth looking at the application UI (http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S <[email protected]> wrote: > I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am > running to only 10 iterations. > > The MLLib version of logistic regression doesn't seem to use all the cores on > my machine. > > Regards, > Krishna > > > > On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia <[email protected]> wrote: > Are you using the logistic_regression.py in examples/src/main/python or > examples/src/main/python/mllib? The first one is an example of writing > logistic regression by hand and won’t be as efficient as the MLlib one. I > suggest trying the MLlib one. > > You may also want to check how many iterations it runs — by default I think > it runs 100, which may be more than you need. > > Matei > > On Jun 4, 2014, at 5:47 PM, Srikrishna S <[email protected]> wrote: > > > Hi All., > > > > I am new to Spark and I am trying to run LogisticRegression (with SGD) > > using MLLib on a beefy single machine with about 128GB RAM. The dataset has > > about 80M rows with only 4 features so it barely occupies 2Gb on disk. > > > > I am running the code using all 8 cores with 20G memory using > > spark-submit --executor-memory 20G --master local[8] logistic_regression.py > > > > It seems to take about 3.5 hours without caching and over 5 hours with > > caching. > > > > What is the recommended use for Spark on a beefy single machine? > > > > Any suggestions will help! > > > > Regards, > > Krishna > > > > > > Code sample: > > --------------------------------------------------------------------------------------------------------------------- > > # Dataset > > d = sys.argv[1] > > data = sc.textFile(d) > > > > # Load and parse the data > > # > > ---------------------------------------------------------------------------------------------------------- > > def parsePoint(line): > > values = [float(x) for x in line.split(',')] > > return LabeledPoint(values[0], values[1:]) > > _parsedData = data.map(parsePoint) > > parsedData = _parsedData.cache() > > results = {} > > > > # Spark > > # > > ---------------------------------------------------------------------------------------------------------- > > start_time = time.time() > > # Build the gl_model > > niters = 10 > > spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) > > > > # Evaluate the gl_model on training data > > labelsAndPreds = parsedData.map(lambda p: (p.label, > > spark_model.predict(p.features))) > > trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / > > float(parsedData.count()) > > > >
