Hi, Pal, thanks a lot, this can indeed help me. On Mon, Mar 28, 2016 at 10:44 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:
> Hi Charles, > > I tried this with dummied out functions which just sum transformations of > a list of integers, maybe they could be replaced by algorithms in your > case. The idea is to call them through a "god" function that takes an > additional type parameter and delegates out to the appropriate function. > Here's my code, maybe it helps... > > def f0(xs): >> return len(xs) >> def f1(xs): >> return sum(xs) >> def f2(xs): >> return sum([x**2 for x in xs]) >> def f_god(n, xs): >> if n == 1: >> return f1(xs) >> elif n == 2: >> return f2(xs) >> else: >> return f0(xs) >> >> xs = [x for x in range(0, 5)] >> xs_b = sc.broadcast(xs) >> ns = sc.parallelize([x for x in range(0, 3)]) >> results = ns.map(lambda n: f_god(n, xs_b.value)) >> print results.take(10) > > > gives me: > > [5, 10, 30] > -sujit > > > On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> You probably want to look at the map transformation, and the many more >> defined on RDDs. The function you pass in to map is serialized and the >> computation is distributed. >> >> >> On Monday, March 28, 2016, charles li <charles.up...@gmail.com> wrote: >> >>> >>> use case: have a dataset, and want to use different algorithms on that, >>> and fetch the result. >>> >>> for making this, I think I should distribute my algorithms, and run >>> these algorithms on the dataset at the same time, am I right? >>> >>> but it seems that spark can not parallelize/serialize >>> algorithms/functions, then how to make it? >>> >>> >>> *here is the test code*: >>> >>> >>> ------------------------------------------------------------------------------------------------ >>> def test(): >>> pass >>> function_list = [test] * 10 >>> >>> sc.parallelize([test] * 10).take(1) >>> >>> ------------------------------------------------------------------------------------------------ >>> >>> >>> *error message: * >>> Py4JJavaError: An error occurred while calling >>> z:org.apache.spark.api.python.PythonRDD.runJob. >>> >>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>> Task 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in >>> stage 9.0 (TID 105, sh-demo-hadoop-07): >>> org.apache.spark.api.python.PythonException: Traceback (most recent call >>> last): >>> >>> File >>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", >>> line 111, in main >>> >>> process() >>> >>> File >>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", >>> line 106, in process >>> >>> serializer.dump_stream(func(split_index, iterator), outfile) >>> >>> File >>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >>> line 263, in dump_stream >>> >>> vs = list(itertools.islice(iterator, batch)) >>> >>> File >>> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line >>> 1293, in takeUpToNumLeft >>> >>> File >>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >>> line 139, in load_stream >>> >>> yield self._read_with_length(stream) >>> >>> File >>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >>> line 164, in _read_with_length >>> >>> return self.loads(obj) >>> >>> File >>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >>> line 422, in loads >>> >>> return pickle.loads(obj) >>> >>> AttributeError: 'module' object has no attribute 'test' >>> >>> >>> at >>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) >>> >>> at >>> org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) >>> >>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) >>> >>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) >>> >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>> >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>> >>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>> >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> what's interesting is that* when I run sc.parallelize([test] * >>> 10).collect() , it works fine*, returns : >>> >>> [<function __main__.test>, >>> >>> <function __main__.test>, >>> >>> <function __main__.test>, >>> >>> <function __main__.test>, >>> >>> <function __main__.test>, >>> >>> <function __main__.test>, >>> >>> <function __main__.test>, >>> >>> <function __main__.test>, >>> >>> <function __main__.test>, >>> >>> <function __main__.test>] >>> >>> >>> >>> >>> -- >>> -------------------------------------- >>> a spark lover, a quant, a developer and a good man. >>> >>> http://github.com/litaotao >>> >> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau >> >> > -- *--------------------------------------* a spark lover, a quant, a developer and a good man. http://github.com/litaotao