On Mon, Oct 6, 2014 at 1:08 PM, <jan.zi...@centrum.cz> wrote: > Hi, > > > Thank you for your advice. It really might work, but to specify my problem a > bit more, think of my data more like one generated item is one parsed > wikipedia page. I am getting this generator from the parser and I don't want > to save it to the storage, but directly apply parallelize and create RDD, > based on your advice I'm now thinking that something like batching and > creating several RDDs and then applying union on them might possibly be the > way to go. > > > Originaly I was thinking of calling the parsing function in flatMap on the > RDD loaded from the xml file, but then I unfortunately had this problem > http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim > so now I am trying to parse the xml on the master node an directly put it to > the RDD.
gensim.corpora.wikicorpus.extract_pages should be call in flatMap() for better performance (or it will be the bottleneck in master). Because the function used in flatMap() is executed in worker, so you should make sure that the files (accessed by extract_pages) should be accessable by workers, putting them in a DFS or NFS in cluster mode. In local mode, may be you should use absolute path for the files. Davies > ______________________________________________________________ >> Od: Davies Liu <dav...@databricks.com> >> Komu: <jan.zi...@centrum.cz> >> Datum: 06.10.2014 18:09 >> Předmět: Re: Spark and Python using generator of data bigger than RAM as >> input to sc.parallelize() >> > > sc.parallelize() to distribute a list of data into numbers of partitions, > but > generator can not be cut and serialized automatically. > > If you can partition your generator, then you can try this: > > sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x)) > > such as you want to generate xrange(M), M is huge, so > sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1)) > > On Mon, Oct 6, 2014 at 7:16 AM, <jan.zi...@centrum.cz> wrote: >> Hi, >> >> >> I would like to ask if it is possible to use generator, that generates >> data >> bigger than size of RAM across all the machines as the input for sc = >> SparkContext(), sc.paralelize(generator). I would like to create RDD this >> way. When I am trying to create RDD by sc.TextFile(file) where file has >> even >> bigger size than data generated by the generator everything works fine, >> but >> unfortunately I need to use sc.parallelize(generator) and it makes my OS >> to >> kill the spark job. >> >> >> >> I'm getting only this log and then the job is killed: >> >> >> >> 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, >> >> 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: >> root, >> >> 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: >> authentication disabled; ui acls disabled; users with view permissions: >> Set(root, ); users with modify permissions: Set(root, ) >> >> 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started >> >> 14/10/06 13:34:16 INFO Remoting: Starting remoting >> >> 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses >> :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] >> >> 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: >> [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] >> >> 14/10/06 13:34:17 INFO util.Utils: Successfully started service >> 'sparkDriver' on port 41016. >> >> 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker >> >> 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster >> >> 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory >> at >> /mnt/spark/spark-local-20141006133417-821e >> >> 14/10/06 13:34:17 INFO util.Utils: Successfully started service >> 'Connection >> manager for block manager' on port 42438. >> >> 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port >> 42438 >> with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) >> >> 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with >> capacity 267.3 MB >> >> 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register >> BlockManager >> >> 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block >> manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM >> >> 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager >> >> 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is >> /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 >> >> 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server >> >> 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT >> >> 14/10/06 13:34:17 INFO server.AbstractConnector: Started >> SocketConnector@0.0.0.0:44768 >> >> 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file >> server' on port 44768. >> >> 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT >> >> 14/10/06 13:34:17 INFO server.AbstractConnector: Started >> SelectChannelConnector@0.0.0.0:4040 >> >> 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' >> on >> port 4040. >> >> 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at >> http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 >> >> 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to >> /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py >> >> 14/10/06 13:34:18 INFO spark.SparkContext: Added file >> file:/root/generator_test.py at >> http://172.31.25.197:44768/files/generator_test.py with timestamp >> 1412602458065 >> >> 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Connecting to master >> spark://ec2-54-164-72-236.compute-1.amazonaws.com:7077... >> >> 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: >> SchedulerBackend >> is ready for scheduling beginning after reached >> minRegisteredResourcesRatio: >> 0.0 >> >> 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Connected to >> Spark cluster with app ID app-20141006133418-0046 >> >> 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor added: >> app-20141006133418-0046/0 on >> worker-20141005074620-ip-172-31-30-40.ec2.internal-49979 >> (ip-172-31-30-40.ec2.internal:49979) with 1 cores >> >> 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Granted >> executor >> ID app-20141006133418-0046/0 on hostPort >> ip-172-31-30-40.ec2.internal:49979 >> with 1 cores, 512.0 MB RAM >> >> 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor updated: >> app-20141006133418-0046/0 is now RUNNING >> >> 14/10/06 13:34:21 INFO cluster.SparkDeploySchedulerBackend: Registered >> executor: >> >> Actor[akka.tcp://sparkExecutor@ip-172-31-30-40.ec2.internal:50877/user/Executor#-1621441852] >> with ID 0 >> >> 14/10/06 13:34:21 INFO storage.BlockManagerMasterActor: Registering block >> manager ip-172-31-30-40.ec2.internal:34460 with 267.3 MB RAM >> >> >> Thank you in advance for any advice or sugestion. >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org