Hi, Pal, thanks a lot, this can indeed help me.

On Mon, Mar 28, 2016 at 10:44 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:

> Hi Charles,
>
> I tried this with dummied out functions which just sum transformations of
> a list of integers, maybe they could be replaced by algorithms in your
> case. The idea is to call them through a "god" function that takes an
> additional type parameter and delegates out to the appropriate function.
> Here's my code, maybe it helps...
>
> def f0(xs):
>>   return len(xs)
>> def f1(xs):
>>   return sum(xs)
>> def f2(xs):
>>   return sum([x**2 for x in xs])
>> def f_god(n, xs):
>>   if n == 1:
>>     return f1(xs)
>>   elif n == 2:
>>     return f2(xs)
>>   else:
>>     return f0(xs)
>>
>> xs = [x for x in range(0, 5)]
>> xs_b = sc.broadcast(xs)
>> ns = sc.parallelize([x for x in range(0, 3)])
>> results = ns.map(lambda n: f_god(n, xs_b.value))
>> print results.take(10)
>
>
> gives me:
>
> [5, 10, 30]
> -sujit
>
>
> On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> You probably want to look at the map transformation, and the many more
>> defined on RDDs. The function you pass in to map is serialized and the
>> computation is distributed.
>>
>>
>> On Monday, March 28, 2016, charles li <charles.up...@gmail.com> wrote:
>>
>>>
>>> use case: have a dataset, and want to use different algorithms on that,
>>> and fetch the result.
>>>
>>> for making this, I think I should distribute my algorithms, and run
>>> these algorithms on the dataset at the same time, am I right?
>>>
>>> but it seems that spark can not parallelize/serialize
>>> algorithms/functions, then how to make it?
>>>
>>>
>>> *here is the test code*:
>>>
>>>
>>> ------------------------------------------------------------------------------------------------
>>> def test():
>>>     pass
>>> function_list = [test] * 10
>>>
>>> sc.parallelize([test] * 10).take(1)
>>>
>>> ------------------------------------------------------------------------------------------------
>>>
>>>
>>> *error message: *
>>> Py4JJavaError: An error occurred while calling
>>> z:org.apache.spark.api.python.PythonRDD.runJob.
>>>
>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in
>>> stage 9.0 (TID 105, sh-demo-hadoop-07):
>>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>>> last):
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>>> line 111, in main
>>>
>>>     process()
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>>> line 106, in process
>>>
>>>     serializer.dump_stream(func(split_index, iterator), outfile)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 263, in dump_stream
>>>
>>>     vs = list(itertools.islice(iterator, batch))
>>>
>>>   File
>>> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line
>>> 1293, in takeUpToNumLeft
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 139, in load_stream
>>>
>>>     yield self._read_with_length(stream)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 164, in _read_with_length
>>>
>>>     return self.loads(obj)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 422, in loads
>>>
>>>     return pickle.loads(obj)
>>>
>>> AttributeError: 'module' object has no attribute 'test'
>>>
>>>
>>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>>>
>>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
>>>
>>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>>>
>>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>>>
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> what's interesting is that* when I run sc.parallelize([test] *
>>> 10).collect() , it works fine*, returns :
>>>
>>> [<function __main__.test>,
>>>
>>>  <function __main__.test>,
>>>
>>>  <function __main__.test>,
>>>
>>>  <function __main__.test>,
>>>
>>>  <function __main__.test>,
>>>
>>>  <function __main__.test>,
>>>
>>>  <function __main__.test>,
>>>
>>>  <function __main__.test>,
>>>
>>>  <function __main__.test>,
>>>
>>>  <function __main__.test>]
>>>
>>>
>>>
>>>
>>> --
>>> --------------------------------------
>>> a spark lover, a quant, a developer and a good man.
>>>
>>> http://github.com/litaotao
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>
>


-- 
*--------------------------------------*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao

Reply via email to