Spark reads partitions in a wrong order

2014-04-25 Thread Mingyu Kim
If the underlying file system returns files in a non-alphabetical order to
java.io.File.listFiles(), Spark reads the partitions out of order. Here¹s an
example.

var sc = new SparkContext(³local[3]², ³test²);
var rdd1 = sc.parallelize([1,2,3,4,5]);
rdd1.saveAsTextFile(³file://path/to/file²);
var rdd2 = sc.textFile(³file://path/to/file²);
rdd2.collect();

rdd1 is saved to file://path/to/file in three partitions. (I.e.
/path/to/file/part-0, /path/to/file/part-1,
/path/to/file/part-2) Since File.listFiles(), which is used in
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(), returns the partitions
out-of-order, rdd2 has the rows in the order different from that of rdd1.
Note that File.listFiles() explicitly says that it doesn¹t guarantee the
order.

The behavior of RawLocalFileSystem is fine for MapReduce jobs because they
don¹t care about orders, but for Spark, which has a notion of row order,
this looks like a bug. The correct fix would be to sort the files after
calling File.listFiles().

This may be possible to fix somewhere by creating a wrapper
org.apache.hadoop.fs.FileSystem class that sorts the file list before
returning. Is this considered in the original design? Is this a bug?

Mingyu




smime.p7s
Description: S/MIME cryptographic signature


Re: Spark reads partitions in a wrong order

2014-04-25 Thread Andrew Ash
Have you run the same test but with a URL in HDFS rather than the local
filesystem? I think order may be preserved in that run, which makes the
local filesystem losing order look more like a bug.

Sent from my mobile phone
On Apr 25, 2014 9:11 AM, "Mingyu Kim"  wrote:

> If the underlying file system returns files in a non-alphabetical order to
> java.io.File.listFiles(), Spark reads the partitions out of order. Here’s
> an example.
>
> var sc = new SparkContext(“local[3]”, “test”);
> var rdd1 = sc.parallelize([1,2,3,4,5]);
> rdd1.saveAsTextFile(“file://path/to/file”);
> var rdd2 = sc.textFile(“file://path/to/file”);
> rdd2.collect();
>
> rdd1 is saved to file://path/to/file in three partitions. (I.e.
> /path/to/file/part-0, /path/to/file/part-1,
>  /path/to/file/part-2) Since File.listFiles(), which is used in
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(), returns the
> partitions out-of-order, rdd2 has the rows in the order different from that
> of rdd1. Note that File.listFiles() explicitly says that it doesn’t
> guarantee the order.
>
> The behavior of RawLocalFileSystem is fine for MapReduce jobs because they
> don’t care about orders, but for Spark, which has a notion of row order,
> this looks like a bug. The correct fix would be to sort the files after
> calling File.listFiles().
>
> This may be possible to fix somewhere by creating a wrapper
> org.apache.hadoop.fs.FileSystem class that sorts the file list before
> returning. Is this considered in the original design? Is this a bug?
>
> Mingyu
>


Re: Deploying a python code on a spark EC2 cluster

2014-04-25 Thread Shubhabrata
Well, we used the script that comes with spark I think v0.9.1. But I am gonna
try the newer version (1.0rvc2 script). I shall keep you posted about my
findings. Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Deploying-a-python-code-on-a-spark-EC2-cluster-tp4758p4820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


MultipleOutputs IdentityReducer

2014-04-25 Thread Andre Kuhnen



MultipleOutputs IdentityReducer

2014-04-25 Thread Andre Kuhnen
Hello,

I am trying to write multiple files with Spark, but I can not find a way to
do it.

Here is the idea.

val rddKeyValue : Rdd[(String, String)] = rddlines.map( line =>
createKeyValue(line))

now I would like to save this as   and all the values inside
the file

I tried to use this after the map,  but it would overwrite the file, so I
would get only one value for each file.

With GroupByKey I get outOfMemoryError,  so  I wonder if there is a way to
append the next line on the text with the same key ??
On Hadoop we can use IdentityReducer  and KeyBAsedOutput[1]

I tried to this:

rddKeyValue.saveAsHadoopFile("hdfs://test-platform-analytics-master/tmp/dump/product",
classOf[String], classOf[String], classOf[KeyBasedOutput[String, String]])

[1]
class KeyBasedOutput[T >: Null ,V <: AnyRef] extends
MultipleTextOutputFormat[T , V] {

  /**
   * Use they key as part of the path for the final output file.
   */

  override protected def generateFileNameForKeyValue(key: T, value: V,
leaf: String) = {
key.toString()
  }

  /**
   * When actually writing the data, discard the key since it is already in
   * the file path.
   */

  override protected def generateActualKey(key: T, value: V) = {
null
  }
}

Thanks a lot


RE: JMX with Spark

2014-04-25 Thread Ravi Hemnani
Can you share your working metrics.properties.?

I want remote jmx to be enabled so i need to use the JMXSink and monitor my
spark master and workers. 

But what are the parameters that are to be defined like host and port ? 

So your config can help. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JMX-with-Spark-tp4309p4823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


read file from hdfs

2014-04-25 Thread Joe L
I have just 2 two questions?

sc.textFile("hdfs://host:port/user/matei/whatever.txt")

Is host master node?
What port we should use?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/read-file-from-hdfs-tp4824.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Questions about productionizing spark

2014-04-25 Thread Han JU
Hi all,

We are actively testing/benchmarking spark for our production use. Here's
some questions about problems we've encountered so far:

  1. By default 66% of the executor memory is used for RDD caching, so if
there's no explicit caching in the code (eg. rdd.cache(),
rdd.persiste(StorageLevel.MEM_AND_DISK) etc), this ram is wasted? Or spark
allocates it dynamically or uses it for some auto-caching?

  2. We've some code like below (basic join):

val rddA = sc.textFile(..).map(...)
val rddB = sc.textFile(...).filter(...).map(...)

val c = rddA.join(rddB).map(...)
c.count()

  All went well except the last task of join. The job always stucked there,
and lead eventually to OOM.  Even if we give sufficient memory and the job
finally pass, the last task of join took significantly more time than other
tasks (say several minutes vs 200ms). Some pointers on this problem?

  3. For the above join, is it a best practice to make rddA and rddB
co-partitioned before the join?

val rddA = sc.textFile(..).map(...).partitionBy(new
HashPartitioner(128))
val rddB = sc.textFile(...).filter(...).map(...).partitionBy(new
HashPartitioner(128))

 // like above

Note that with this manual co-partitioning and various number of
partitions, the problem above persists.


We use spark_ec2 script with Spark 0.9.0, all data are on the
ephemeral-hdfs.

Thanks!

-- 
*JU Han*

Data Engineer @ Botify.com

+33 061960


Re: Pig on Spark

2014-04-25 Thread Mark Baker
I've only had a quick look at Pig, but it seems that a declarative
layer on top of Spark couldn't be anything other than a big win, as it
allows developers to declare *what* they want, permitting the compiler
to determine how best poke at the RDD API to implement it.

In my brief time with Spark, I've often thought that it feels very
unnatural to use imperative code to declare a pipeline.


FW: reduceByKeyAndWindow - spark internals

2014-04-25 Thread Adrian Mocanu
Any suggestions where I can find this in the documentation or elsewhere?

Thanks

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: April-24-14 11:26 AM
To: u...@spark.incubator.apache.org
Subject: reduceByKeyAndWindow - spark internals

If I have this code:
val stream1= doublesInputStream.window(Seconds(10), Seconds(2))
val stream2= stream1.reduceByKeyAndWindow(_ + _, Seconds(10), Seconds(10))

Does reduceByKeyAndWindow merge all RDDs from stream1 that came in the 10 
second window?

Example, in the first 10 secs stream1 will have 5 RDDS. Does 
reduceByKeyAndWindow merge these 5RDDs into 1 RDD and remove duplicates?

-Adrian



Re: Deploying a python code on a spark EC2 cluster

2014-04-25 Thread Shubhabrata
This is the error from stderr:


Spark Executor Command: "java" "-cp"
":/root/ephemeral-hdfs/conf:/root/ephemeral-hdfs/conf:/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop1.0.4.jar"
"-Djava.library.path=/root/ephemeral-hdfs/lib/native/"
"-Dspark.local.dir=/mnt/spark" "-Dspark.local.dir=/mnt/spark"
"-Dspark.local.dir=/mnt/spark" "-Dspark.local.dir=/mnt/spark" "-Xms2048M"
"-Xmx2048M" "org.apache.spark.executor.CoarseGrainedExecutorBackend"
"akka.tcp://spark@192.168.122.1:44577/user/CoarseGrainedScheduler" "1"
"ip-10-84-7-178.eu-west-1.compute.internal" "1"
"akka.tcp://sparkwor...@ip-10-84-7-178.eu-west-1.compute.internal:57839/user/Worker"
"app-20140425133749-"


14/04/25 13:39:37 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/04/25 13:39:38 INFO Remoting: Starting remoting
14/04/25 13:39:38 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkexecu...@ip-10-84-7-178.eu-west-1.compute.internal:36800]
14/04/25 13:39:38 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkexecu...@ip-10-84-7-178.eu-west-1.compute.internal:36800]
14/04/25 13:39:38 INFO worker.WorkerWatcher: Connecting to worker
akka.tcp://sparkwor...@ip-10-84-7-178.eu-west-1.compute.internal:57839/user/Worker
14/04/25 13:39:38 INFO executor.CoarseGrainedExecutorBackend: Connecting to
driver: akka.tcp://spark@192.168.122.1:44577/user/CoarseGrainedScheduler
14/04/25 13:39:39 INFO worker.WorkerWatcher: Successfully connected to
akka.tcp://sparkwor...@ip-10-84-7-178.eu-west-1.compute.internal:57839/user/Worker
14/04/25 13:41:19 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated
[akka.tcp://sparkexecu...@ip-10-84-7-178.eu-west-1.compute.internal:36800]
-> [akka.tcp://spark@192.168.122.1:44577] disassociated! Shutting down.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Deploying-a-python-code-on-a-spark-EC2-cluster-tp4758p4828.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Pig on Spark

2014-04-25 Thread Eugen Cepoi
It depends, personally I have the opposite opinion.

IMO expressing pipelines in a functional language feels natural, you just
have to get used with the language (scala).

Testing spark jobs is easy where testing a Pig script is much harder and
not natural.

If you want a more high level language that deals with RDDs for you, you
can use spark sql
http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html

Of course you can express less things this way, but if you have some
complex logic I think it would make sense to write a classic spark job that
would be more robust in the long term.


2014-04-25 15:30 GMT+02:00 Mark Baker :

> I've only had a quick look at Pig, but it seems that a declarative
> layer on top of Spark couldn't be anything other than a big win, as it
> allows developers to declare *what* they want, permitting the compiler
> to determine how best poke at the RDD API to implement it.
>
> In my brief time with Spark, I've often thought that it feels very
> unnatural to use imperative code to declare a pipeline.
>


strange error

2014-04-25 Thread Joe L
[error] 14/04/25 23:09:57 INFO slf4j.Slf4jLogger: Slf4jLogger started
[error] 14/04/25 23:09:57 INFO Remoting: Starting remoting
[error] 14/04/25 23:09:58 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://spark@cm03:5]
[error] 14/04/25 23:09:58 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@cm03:5]
[error] 14/04/25 23:09:58 INFO spark.SparkEnv: Registering
BlockManagerMaster
[error] 14/04/25 23:09:58 INFO storage.DiskBlockManager: Created local
directory at /tmp/spark-local-20140425230958-f550
[error] 14/04/25 23:09:58 INFO storage.MemoryStore: MemoryStore started with
capacity 1638.6 MB.
[error] 14/04/25 23:09:58 INFO network.ConnectionManager: Bound socket to
port 60395 with id = ConnectionManagerId(cm03,60395)
[error] 14/04/25 23:09:58 INFO storage.BlockManagerMaster: Trying to
register BlockManager
[error] 14/04/25 23:09:58 INFO
storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager
cm03:60395 with 1638.6 MB RAM
[error] 14/04/25 23:09:58 INFO storage.BlockManagerMaster: Registered
BlockManager
[error] 14/04/25 23:09:58 INFO spark.HttpServer: Starting HTTP Server
[error] 14/04/25 23:09:58 INFO server.Server: jetty-7.6.8.v20121106
[error] 14/04/25 23:09:58 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:55020
[error] 14/04/25 23:09:58 INFO broadcast.HttpBroadcast: Broadcast server
started at http://192.168.100.172:55020
[error] 14/04/25 23:09:58 INFO spark.SparkEnv: Registering MapOutputTracker
[error] 14/04/25 23:09:58 INFO spark.HttpFileServer: HTTP File server
directory is /tmp/spark-57df291f-b749-46f7-a1ed-98609e2ad9d8
[error] 14/04/25 23:09:58 INFO spark.HttpServer: Starting HTTP Server
[error] 14/04/25 23:09:58 INFO server.Server: jetty-7.6.8.v20121106
[error] 14/04/25 23:09:58 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:47145
[error] 14/04/25 23:09:58 INFO server.Server: jetty-7.6.8.v20121106
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage/rdd,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/stage,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/pool,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/environment,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/executors,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/metrics/json,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/static,null}
[error] 14/04/25 23:09:58 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/,null}
[error] 14/04/25 23:09:58 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
[error] 14/04/25 23:09:58 INFO ui.SparkUI: Started Spark Web UI at
http://cm03:4040
[error] 14/04/25 23:09:58 INFO spark.SparkContext: Added JAR
target/scala-2.10/simple-project_2.10-1.0.jar at
http://192.168.100.172:47145/jars/simple-project_2.10-1.0.jar with timestamp
1398434998858
[error] 14/04/25 23:09:59 INFO storage.MemoryStore: ensureFreeSpace(32856)
called with curMem=0, maxMem=1718196633
[error] 14/04/25 23:09:59 INFO storage.MemoryStore: Block broadcast_0 stored
as values to memory (estimated size 32.1 KB, free 1638.6 MB)
[error] 14/04/25 23:09:59 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
[error] 14/04/25 23:09:59 WARN snappy.LoadSnappy: Snappy native library not
loaded
[error] 14/04/25 23:09:59 INFO mapred.FileInputFormat: Total input paths to
process : 1
[error] 14/04/25 23:09:59 INFO spark.SparkContext: Starting job: count at
test.scala:11
[error] 14/04/25 23:09:59 INFO scheduler.DAGScheduler: Got job 0 (count at
test.scala:11) with 2 output partitions (allowLocal=false)
[error] 14/04/25 23:09:59 INFO scheduler.DAGScheduler: Final stage: Stage 0
(count at test.scala:11)
[error] 14/04/25 23:09:59 INFO scheduler.DAGScheduler: Parents of final
stage: List()
[error] 14/04/25 23:09:59 INFO scheduler.DAGScheduler: Missing parents:
List()
[error] 14/04/25 23:09:59 INFO scheduler.DAGScheduler: Submitting Stage 0
(FilteredRDD[2] at filter at test.scala:11), which has no missing parents
[error] 14/04/25 23:09:59 INFO scheduler.DAGScheduler: Submitting 2 missing
tasks from Stage 0 (FilteredRDD[2] at filter at test.scala:11)
[error] 14/04/25 23:09:59 INFO scheduler.TaskSchedulerImpl: Adding task set
0.0 with 2 tasks
[error] 14/04/25 23:09:59 INFO scheduler.TaskSetManager: Starting task 0.0:0
as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
[error] 14/04/25 23:09:59 INFO scheduler.TaskSetM

Re: read file from hdfs

2014-04-25 Thread Christophe Préaud

You should use the values defined in the 'fs.defaultFS' property (in hadoop
core-site.xml file).

Christophe.

On 25/04/2014 14:38, Joe L wrote:

I have just 2 two questions?

sc.textFile("hdfs://host:port/user/matei/whatever.txt")

Is host master node?
What port we should use?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/read-file-from-hdfs-tp4824.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Securing Spark's Network

2014-04-25 Thread Jacob Eisinger
 Howdy,

We tried running Spark 0.9.1 stand-alone inside docker containers distributed 
over multiple hosts. This is complicated due to Spark opening up ephemeral / 
dynamic ports for the workers and the CLI.  To ensure our docker solution 
doesn't break Spark in unexpected ways and maintains a secure cluster, I am 
interested in understanding more about Spark's network architecture. I'd 
appreciate it if you could you point us to any documentation!

A couple specific questions:
What are these ports being used for?

Checking out the code / experiments, it looks like asynchronous communication 
for shuffling around results. Anything else?How do you secure the network?

Network administrators tend to secure and monitor the network at the port 
level. If these ports are dynamic and open randomly, firewalls are not easily 
configured and security alarms are raised. Is there a way to limit the range 
easily? (We did investigate setting the kernel parameter 
ip_local_reserved_ports, but this is broken [1] on some versions of Linux's 
cgroups.)

Thanks,
Jacob

[1] https://github.com/lxc/lxc/issues/97 

Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075

Re: Deploying a python code on a spark EC2 cluster

2014-04-25 Thread Shubhabrata
In order to check if there is any issue with python API I ran a scala
application provided in the examples. Still the same error

./bin/run-example org.apache.spark.examples.SparkPi
spark://[Master-URL]:7077


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/work/spark-0.9.1/examples/target/scala-2.10/spark-examples-assembly-0.9.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/mnt/work/spark-0.9.1/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/04/25 17:07:10 INFO Utils: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/04/25 17:07:10 WARN Utils: Your hostname, rd-hu resolves to a loopback
address: 127.0.1.1; using 192.168.122.1 instead (on interface virbr0)
14/04/25 17:07:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
14/04/25 17:07:11 INFO Slf4jLogger: Slf4jLogger started
14/04/25 17:07:11 INFO Remoting: Starting remoting
14/04/25 17:07:11 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@192.168.122.1:26278]
14/04/25 17:07:11 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@192.168.122.1:26278]
14/04/25 17:07:11 INFO SparkEnv: Registering BlockManagerMaster
14/04/25 17:07:11 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140425170711-d1da
14/04/25 17:07:11 INFO MemoryStore: MemoryStore started with capacity 16.0
GB.
14/04/25 17:07:11 INFO ConnectionManager: Bound socket to port 9788 with id
= ConnectionManagerId(192.168.122.1,9788)
14/04/25 17:07:11 INFO BlockManagerMaster: Trying to register BlockManager
14/04/25 17:07:11 INFO BlockManagerMasterActor$BlockManagerInfo: Registering
block manager 192.168.122.1:9788 with 16.0 GB RAM
14/04/25 17:07:11 INFO BlockManagerMaster: Registered BlockManager
14/04/25 17:07:11 INFO HttpServer: Starting HTTP Server
14/04/25 17:07:11 INFO HttpBroadcast: Broadcast server started at
http://192.168.122.1:58091
14/04/25 17:07:11 INFO SparkEnv: Registering MapOutputTracker
14/04/25 17:07:11 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-599577a4-5732-4949-a2e8-f59eb679e843
14/04/25 17:07:11 INFO HttpServer: Starting HTTP Server
14/04/25 17:07:12 WARN AbstractLifeCycle: FAILED
SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already
in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:444)
at sun.nio.ch.Net.bind(Net.java:436)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:286)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at
org.apache.spark.ui.JettyUtils$$anonfun$1.apply$mcV$sp(JettyUtils.scala:118)
at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:118)
at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:118)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.ui.JettyUtils$.connect$1(JettyUtils.scala:118)
at 
org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:129)
at org.apache.spark.ui.SparkUI.bind(SparkUI.scala:57)
at org.apache.spark.SparkContext.(SparkContext.scala:159)
at org.apache.spark.SparkContext.(SparkContext.scala:100)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
14/04/25 17:07:12 WARN AbstractLifeCycle: FAILED
org.eclipse.jetty.server.Server@74f4b96: java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:444)
at sun.nio.ch.Net.bind(Net.java:436)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConne

Re: what is the best way to do cartesian

2014-04-25 Thread Alex Boisvert
You might want to try the built-in RDD.cartesian() method.


On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei  wrote:

> Hi All,
>
> I have a problem with the Item-Based Collaborative Filtering Recommendation
> Algorithms in spark.
> The basic flow is as below:
> (Item1,  (User1 ,
> Score1))
>RDD1 ==>(Item2,  (User2 ,   Score2))
> (Item1,  (User2 ,
> Score3))
> (Item2,  (User1 ,
> Score4))
>
>RDD1.groupByKey   ==>  RDD2
> (Item1,  ((User1,
> Score1),
> (User2,   Score3)))
> (Item2,  ((User1,
> Score4),
> (User2,   Score2)))
>
> The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
> ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
> Item2.
>
> In my situation, RDD2 contains 20 million records, my spark programm is
> extreamly slow, the source code is as below:
> val conf = new
> SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score
> Calcu Total").set("spark.executor.memory",
> "20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar"))
> val sc = new SparkContext(conf)
>
> val mongoRDD =
> sc.textFile(args(0).toString,
> 400)
> val jsonRDD = mongoRDD.map(arg => new
> JSONObject(arg))
>
> val newRDD = jsonRDD.map(arg => {
> var score =
> haha(arg.get("a").asInstanceOf[JSONObject])
>
> // set score to 0.5 for testing
> arg.put("score", 0.5)
> arg
> })
>
> val resourceScoresRDD = newRDD.map(arg =>
> (arg.get("rid").toString.toLong, (arg.get("zid").toString,
> arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache()
> val resourceScores =
> resourceScoresRDD.collect()
> val bcResourceScores =
> sc.broadcast(resourceScores)
>
> val simRDD =
> resourceScoresRDD.mapPartitions({iter =>
> val m = bcResourceScores.value
> for{ (r1, v1) <- iter
>(r2, v2) <- m
>if r1 > r2
> } yield (r1, r2, cosSimilarity(v1,
> v2))}, true).filter(arg => arg._3 > 0.1)
>
> println(simRDD.count)
>
> And I saw this in Spark Web UI:
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png
> >
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png
> >
>
> My standalone cluster has 3 worker node (16 core and 32G RAM),and the
> workload of the machine in my cluster is heavy when the spark program is
> running.
>
> Is there any better way to do the algorithm?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: what is the best way to do cartesian

2014-04-25 Thread Eugen Cepoi
Depending on the size of the rdd you could also do a collect broadcast and
then compute the product in a map function over the other rdd. If this is
the same rdd you might also want to cache it. This pattern worked quite
good for me
Le 25 avr. 2014 18:33, "Alex Boisvert"  a écrit :

> You might want to try the built-in RDD.cartesian() method.
>
>
> On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei  wrote:
>
>> Hi All,
>>
>> I have a problem with the Item-Based Collaborative Filtering
>> Recommendation
>> Algorithms in spark.
>> The basic flow is as below:
>> (Item1,  (User1 ,
>> Score1))
>>RDD1 ==>(Item2,  (User2 ,
>> Score2))
>> (Item1,  (User2 ,
>> Score3))
>> (Item2,  (User1 ,
>> Score4))
>>
>>RDD1.groupByKey   ==>  RDD2
>> (Item1,  ((User1,
>> Score1),
>> (User2,   Score3)))
>> (Item2,  ((User1,
>> Score4),
>> (User2,   Score2)))
>>
>> The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
>> ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
>> Item2.
>>
>> In my situation, RDD2 contains 20 million records, my spark programm is
>> extreamly slow, the source code is as below:
>> val conf = new
>> SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score
>> Calcu Total").set("spark.executor.memory",
>> "20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar"))
>> val sc = new SparkContext(conf)
>>
>> val mongoRDD =
>> sc.textFile(args(0).toString,
>> 400)
>> val jsonRDD = mongoRDD.map(arg => new
>> JSONObject(arg))
>>
>> val newRDD = jsonRDD.map(arg => {
>> var score =
>> haha(arg.get("a").asInstanceOf[JSONObject])
>>
>> // set score to 0.5 for testing
>> arg.put("score", 0.5)
>> arg
>> })
>>
>> val resourceScoresRDD = newRDD.map(arg =>
>> (arg.get("rid").toString.toLong, (arg.get("zid").toString,
>> arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache()
>> val resourceScores =
>> resourceScoresRDD.collect()
>> val bcResourceScores =
>> sc.broadcast(resourceScores)
>>
>> val simRDD =
>> resourceScoresRDD.mapPartitions({iter =>
>> val m = bcResourceScores.value
>> for{ (r1, v1) <- iter
>>(r2, v2) <- m
>>if r1 > r2
>> } yield (r1, r2, cosSimilarity(v1,
>> v2))}, true).filter(arg => arg._3 > 0.1)
>>
>> println(simRDD.count)
>>
>> And I saw this in Spark Web UI:
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png
>> >
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png
>> >
>>
>> My standalone cluster has 3 worker node (16 core and 32G RAM),and the
>> workload of the machine in my cluster is heavy when the spark program is
>> running.
>>
>> Is there any better way to do the algorithm?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Pig on Spark

2014-04-25 Thread Michael Armbrust
On Fri, Apr 25, 2014 at 6:30 AM, Mark Baker  wrote:

> I've only had a quick look at Pig, but it seems that a declarative
> layer on top of Spark couldn't be anything other than a big win, as it
> allows developers to declare *what* they want, permitting the compiler
> to determine how best poke at the RDD API to implement it.
>

Having Pig too would certainly be a win, but Spark
SQLis
also a declarative layer on top of Spark.  Since the optimization is
lazy, you can chain multiple SQL statements in a row and still optimize
them holistically (similar to a pig job).  Alpha version coming soon to a
Spark 1.0 release near you!

Spark SQL also lets to drop back into functional Scala when that is more
natural for a particular task.


Spark & Shark 0.9.1 on ec2 with Hadoop 2 error

2014-04-25 Thread jesseerdmann
I've run into a problem trying to launch a cluster using the provided ec2
python script with --hadoop-major-version 2.  The launch completes correctly
with the exception of an Exception getting thrown for Tachyon 7 (I've
included it at the end of the message, but that is not the focus and seems
unrelated to my issue.)

When I log in and try to run shark-withinfo, I get the following exception
and I'm not sure where to go from here. 

Exception in thread "main" java.lang.RuntimeException:
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:278)
at shark.SharkCliDriver$.main(SharkCliDriver.scala:128)
at shark.SharkCliDriver.main(SharkCliDriver.scala)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthenticator(HiveUtils.java:368)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:270)
... 2 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator.setConf(HadoopDefaultAuthenticator.java:53)
at 
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthenticator(HiveUtils.java:365)
... 3 more
Caused by: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
at org.apache.hadoop.security.Groups.(Groups.java:64)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at
org.apache.hadoop.hive.shims.HadoopShimsSecure.getUGIForConf(HadoopShimsSecure.java:491)
at
org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator.setConf(HadoopDefaultAuthenticator.java:51)
... 6 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:129)
... 15 more
Caused by: java.lang.UnsatisfiedLinkError:
org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative()V
at 
org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative(Native
Method)
at
org.apache.hadoop.security.JniBasedUnixGroupsMapping.(JniBasedUnixGroupsMapping.java:49)
at
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.(JniBasedUnixGroupsMappingWithFallback.java:38)
... 20 more





For completeness, the Tachyon exception during cluster launch:

Exception in thread "main" java.lang.RuntimeException:
org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
communicate with client version 4
at tachyon.util.CommonUtils.runtimeException(CommonUtils.java:246)
at tachyon.UnderFileSystemHdfs.(UnderFileSystemHdfs.java:73)
at tachyon.UnderFileSystemHdfs.getClient(UnderFileSystemHdfs.java:53)
at tachyon.UnderFileSystem.get(UnderFileSystem.java:53)
at tachyon.Format.main(Format.java:54)
Caused by: org.apache.hadoop.ipc.RemoteException: Server IPC version 7
cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy1.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
at 
org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
 

Re: Securing Spark's Network

2014-04-25 Thread Akhil Das
Hi Jacob,

This post might give you a brief idea about the ports being used

https://groups.google.com/forum/#!topic/spark-users/PN0WoJiB0TA





On Fri, Apr 25, 2014 at 8:53 PM, Jacob Eisinger  wrote:

> Howdy,
>
> We tried running Spark 0.9.1 stand-alone inside docker containers
> distributed over multiple hosts. This is complicated due to Spark opening
> up ephemeral / dynamic ports for the workers and the CLI.  To ensure our
> docker solution doesn't break Spark in unexpected ways and maintains a
> secure cluster, I am interested in understanding more about Spark's network
> architecture. I'd appreciate it if you could you point us to any
> documentation!
>
> A couple specific questions:
>
>1. What are these ports being used for?
>Checking out the code / experiments, it looks like asynchronous
>communication for shuffling around results. Anything else?
>2. How do you secure the network?
>Network administrators tend to secure and monitor the network at the
>port level. If these ports are dynamic and open randomly, firewalls are not
>easily configured and security alarms are raised. Is there a way to limit
>the range easily? (We did investigate setting the kernel parameter
>ip_local_reserved_ports, but this is broken [1] on some versions of Linux's
>cgroups.)
>
>
> Thanks,
> Jacob
>
> [1] https://github.com/lxc/lxc/issues/97
>
> Jacob D. Eisinger
> IBM Emerging Technologies
> jeis...@us.ibm.com - (512) 286-6075


Strange lookup behavior. Possible bug?

2014-04-25 Thread Yadid Ayzenberg

Hi All,

Im running a lookup on a JavaPairRDD.
When running on local machine - the lookup is successfull. However, when 
running a standalone cluster with the exact same dataset - one of the 
tasks never ends (constantly in RUNNING status).
When viewing the worker log, it seems that the task has finished 
successfully:


14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally
14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 10896794
14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs indefinitely.

If I execute a count priot to the lookup - I get the correct number 
which suggests that the cluster is operating as expected.


The exact same scenario works with a different type of key (Tuple2): 
JavaPairRDD.


Any ideas on how to debug this problem ?

Thanks,

Yadid



Re: Spark & Shark 0.9.1 on ec2 with Hadoop 2 error

2014-04-25 Thread Akhil Das
Hi

I also have faced the same problem with shark 0.9.1 version and i have it
fixed by sbt clean/packaging the shark with the rite hadoop version. You
may execute the following commands to get it done.

*cd shark;export SHARK_HADOOP_VERSION=$(/root/ephemeral-hdfs/bin/hadoop
version | head -n1 | cut -d" " -f 2-);sbt/sbt clean*

*cd shark;export SHARK_HADOOP_VERSION=$(/root/ephemeral-hdfs/bin/hadoop
version | head -n1 | cut -d" " -f 2-);sbt/sbt package*




On Fri, Apr 25, 2014 at 11:16 PM, jesseerdmann  wrote:

> I've run into a problem trying to launch a cluster using the provided ec2
> python script with --hadoop-major-version 2.  The launch completes
> correctly
> with the exception of an Exception getting thrown for Tachyon 7 (I've
> included it at the end of the message, but that is not the focus and seems
> unrelated to my issue.)
>
> When I log in and try to run shark-withinfo, I get the following exception
> and I'm not sure where to go from here.
>
> Exception in thread "main" java.lang.RuntimeException:
> org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:278)
> at shark.SharkCliDriver$.main(SharkCliDriver.scala:128)
> at shark.SharkCliDriver.main(SharkCliDriver.scala)
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> at
>
> org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthenticator(HiveUtils.java:368)
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:270)
> ... 2 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> at
>
> org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator.setConf(HadoopDefaultAuthenticator.java:53)
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
> at
>
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
> at
>
> org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthenticator(HiveUtils.java:365)
> ... 3 more
> Caused by: java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> at
>
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
> at org.apache.hadoop.security.Groups.(Groups.java:64)
> at
>
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
> at
>
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
> at
>
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
> at
>
> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
> at
>
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
> at
>
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
> at
>
> org.apache.hadoop.hive.shims.HadoopShimsSecure.getUGIForConf(HadoopShimsSecure.java:491)
> at
>
> org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator.setConf(HadoopDefaultAuthenticator.java:51)
> ... 6 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
>
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:129)
> ... 15 more
> Caused by: java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative()V
> at
> org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative(Native
> Method)
> at
>
> org.apache.hadoop.security.JniBasedUnixGroupsMapping.(JniBasedUnixGroupsMapping.java:49)
> at
>
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.(JniBasedUnixGroupsMappingWithFallback.java:38)
> ... 20 more
>
>
>
>
>
> For completeness, the Tachyon exception during cluster launch:
>
> Exception in thread "main" java.lang.RuntimeException:
> org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
> communicate with client version 4
> at tachyon.util.CommonUtils.runtimeException(CommonUtils.java:246)
> at tachyon.UnderFileSystemHdfs.(UnderFileSystemHdfs.java:73)
> at
> tachyon.UnderFileSystemHdfs.getClient(UnderFileSystemHdfs.java:53)
> at tachyon.Unde

help

2014-04-25 Thread Joe L
I need someone's help please I am getting the following error. 

[error] 14/04/26 03:09:47 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20140426030946-0004/8 removed: class java.io.IOException: Cannot run
program "/home/exobrain/install/spark-0.9.1/bin/compute-classpath.sh" (in
directory "."): error=13



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: JMX with Spark

2014-04-25 Thread Paul Schooss
Hello Folks,

Sorry for the delay, these emails got missed due to the volume.

Here is my metrics.conf


root@jobs-ab-hdn4:~# cat /opt/klout/spark/conf/metrics.conf
#  syntax: [instance].sink|source.[name].[options]=[value]

#  This file configures Spark's internal metrics system. The metrics system
is
#  divided into instances which correspond to internal components.
#  Each instance can be configured to report its metrics to one or more
sinks.
#  Accepted values for [instance] are "master", "worker", "executor",
"driver",
#  and "applications". A wild card "*" can be used as an instance name, in
#  which case all instances will inherit the supplied property.
#
#  Within an instance, a "source" specifies a particular set of grouped
metrics.
#  there are two kinds of sources:
#1. Spark internal sources, like MasterSource, WorkerSource, etc, which
will
#collect a Spark component's internal state. Each instance is paired
with a
#Spark source that is added automatically.
#2. Common sources, like JvmSource, which will collect low level state.
#These can be added through configuration options and are then loaded
#using reflection.
#
#  A "sink" specifies where metrics are delivered to. Each instance can be
#  assigned one or more sinks.
#
#  The sink|source field specifies whether the property relates to a sink or
#  source.
#
#  The [name] field specifies the name of source or sink.
#
#  The [options] field is the specific property of this source or sink. The
#  source or sink is responsible for parsing this property.
#
#  Notes:
#1. To add a new sink, set the "class" option to a fully qualified class
#name (see examples below).
#2. Some sinks involve a polling period. The minimum allowed polling
period
#is 1 second.
#3. Wild card properties can be overridden by more specific properties.
#For example, master.sink.console.period takes precedence over
#*.sink.console.period.
#4. A metrics specific configuration
#"spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
#added to Java properties using -Dspark.metrics.conf=xxx if you want to
#customize metrics system. You can also put the file in
${SPARK_HOME}/conf
#and it will be loaded automatically.
#5. MetricsServlet is added by default as a sink in master, worker and
client
#driver, you can send http request "/metrics/json" to get a snapshot of
all the
#registered metrics in json format. For master, requests
"/metrics/master/json" and
#"/metrics/applications/json" can be sent seperately to get metrics
snapshot of
#instance master and applications. MetricsServlet may not be configured
by self.
#

## List of available sinks and their properties.

# org.apache.spark.metrics.sink.ConsoleSink
#   Name:   Default:   Description:
#   period  10 Poll period
#   unitsecondsUnits of poll period

# org.apache.spark.metrics.sink.CSVSink
#   Name: Default:   Description:
#   period10 Poll period
#   unit  secondsUnits of poll period
#   directory /tmp   Where to store CSV files

# org.apache.spark.metrics.sink.GangliaSink
#   Name: Default:   Description:
#   host  NONE   Hostname or multicast group of Ganglia server
#   port  NONE   Port of Ganglia server(s)
#   period10 Poll period
#   unit  secondsUnits of poll period
#   ttl   1  TTL of messages sent by Ganglia
#   mode  multicast  Ganglia network mode ('unicast' or 'mulitcast')

#org.apache.spark.metrics.sink.JmxSink

# org.apache.spark.metrics.sink.MetricsServlet
#   Name: Default:   Description:
#   path  VARIES*Path prefix from the web server root
#   samplefalse  Whether to show entire set of samples for
histograms ('false' or 'true')
#
# * Default path is /metrics/json for all instances except the master. The
master has two paths:
# /metrics/aplications/json # App information
# /metrics/master/json  # Master information

# org.apache.spark.metrics.sink.GraphiteSink
#   Name: Default:  Description:
#   host  NONE  Hostname of Graphite server
#   port  NONE  Port of Graphite server
#   period10Poll period
#   unit  seconds   Units of poll period
#   prefixEMPTY STRING  Prefix to prepend to metric name

## Examples
# Enable JmxSink for all instances by class name
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink

# Polling period for ConsoleSink
#*.sink.console.period=10

#*.sink.console.unit=seconds

# Master instance overlap polling period
#master.sink.console.period=15

#master.sink.console.unit=seconds

# Enable CsvSink for all instances
#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink

# Polling period for CsvSink
#*.sink.csv.period=1

#*.sink.csv.unit=minutes

# Polling directory for CsvSink

Re: Pig on Spark

2014-04-25 Thread Bharath Mundlapudi
>> I've only had a quick look at Pig, but it seems that a declarative
>> layer on top of Spark couldn't be anything other than a big win, as it
>> allows developers to declare *what* they want, permitting the compiler
>> to determine how best poke at the RDD API to implement it.

The devil is in the details - allowing developers to declare *what* they
want - seems not practical in a declarative world since we are bound by the
DSL constructs. The work around or rather hack is to have UDFs to have full
language constructs. Some problems are hard, you will have twist your mind
to solve in a restrictive way. At that time, we think, we wish we have
complete language power.

Being in Big Data world for short time (7 years), seen enough problems with
Hive/Pig. All I am providing here is a thought to spark the Spark community
to think beyond declarative constructs.

I am sure there is a place for Pig and Hive.

-Bharath




On Fri, Apr 25, 2014 at 10:21 AM, Michael Armbrust
wrote:

> On Fri, Apr 25, 2014 at 6:30 AM, Mark Baker  wrote:
>
>> I've only had a quick look at Pig, but it seems that a declarative
>> layer on top of Spark couldn't be anything other than a big win, as it
>> allows developers to declare *what* they want, permitting the compiler
>> to determine how best poke at the RDD API to implement it.
>>
>
> Having Pig too would certainly be a win, but Spark 
> SQLis
>  also a declarative layer on top of Spark.  Since the optimization is
> lazy, you can chain multiple SQL statements in a row and still optimize
> them holistically (similar to a pig job).  Alpha version coming soon to a
> Spark 1.0 release near you!
>
> Spark SQL also lets to drop back into functional Scala when that is more
> natural for a particular task.
>


Re: help

2014-04-25 Thread Jey Kottalam
Try taking a look at the stderr logs of the executor
"app-20140426030946-0004/8". This should be in the $SPARK_HOME/work
directory of the corresponding machine.

Hope that helps,
-Jey

On Fri, Apr 25, 2014 at 11:17 AM, Joe L  wrote:
> I need someone's help please I am getting the following error.
>
> [error] 14/04/26 03:09:47 INFO cluster.SparkDeploySchedulerBackend: Executor
> app-20140426030946-0004/8 removed: class java.io.IOException: Cannot run
> program "/home/exobrain/install/spark-0.9.1/bin/compute-classpath.sh" (in
> directory "."): error=13
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4841.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark and HBase

2014-04-25 Thread Josh Mahonin
Phoenix generally presents itself as an endpoint using JDBC, which in my
testing seems to play nicely using JdbcRDD.

However, a few days ago a patch was made against Phoenix to implement
support via PIG using a custom Hadoop InputFormat, which means now it has
Spark support too.

Here's a code snippet that sets up an RDD for a specific query:

--
val phoenixConf = new PhoenixPigConfiguration(new Configuration())
phoenixConf.setSelectStatement("SELECT EVENTTYPE,EVENTTIME FROM EVENTS
WHERE EVENTTYPE = 'some_type')
phoenixConf.setSelectColumns("EVENTTYPE,EVENTTIME")
phoenixConf.configure("servername", "EVENTS", 100L)

val phoenixRDD = sc.newAPIHadoopRDD(
phoenixConf.getConfiguration(),
classOf[PhoenixInputFormat],
  classOf[NullWritable],
  classOf[PhoenixRecord])
--

I'm still very new at Spark and even less experienced with Phoenix, but I'm
hoping there's an advantage over the JdbcRDD in terms of partitioning. The
JdbcRDD seems to implement partitioning based on a query predicate that is
user defined, but I think Phoenix's InputFormat is able to figure out the
splits which Spark is able to leverage. I don't really know how to verify
if this is the case or not though, so if anyone else is looking into this,
I'd love to hear their thoughts.

Josh


On Tue, Apr 8, 2014 at 1:00 PM, Nicholas Chammas  wrote:

> Just took a quick look at the overview 
> here and
> the quick start guide 
> here
> .
>
> It looks like Apache Phoenix aims to provide flexible SQL access to data,
> both for transactional and analytic purposes, and at interactive speeds.
>
> Nick
>
>
> On Tue, Apr 8, 2014 at 12:38 PM, Bin Wang  wrote:
>
>> First, I have not tried it myself. However, what I have heard it has some
>> basic SQL features so you can query you HBase table like query content on
>> HDFS using Hive.
>> So it is not "query a simple column", I believe you can do joins and
>> other SQL queries. Maybe you can wrap up an EMR cluster with Hbase
>> preconfigured and give it a try.
>>
>> Sorry cannot provide more detailed explanation and help.
>>
>>
>>
>> On Tue, Apr 8, 2014 at 10:17 AM, Flavio Pompermaier > > wrote:
>>
>>> Thanks for the quick reply Bin. Phenix is something I'm going to try for
>>> sure but is seems somehow useless if I can use Spark.
>>> Probably, as you said, since Phoenix use a dedicated data structure
>>> within each HBase Table has a more effective memory usage but if I need to
>>> deserialize data stored in a HBase cell I still have to read in memory that
>>> object and thus I need Spark. From what I understood Phoenix is good if I
>>> have to query a simple column of HBase but things get really complicated if
>>> I have to add an index for each column in my table and I store complex
>>> object within the cells. Is it correct?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>>
>>>
>>> On Tue, Apr 8, 2014 at 6:05 PM, Bin Wang  wrote:
>>>
 Hi Flavio,

 I happened to attend, actually attending the 2014 Apache Conf, I heard
 a project called "Apache Phoenix", which fully leverage HBase and suppose
 to be 1000x faster than Hive. And it is not memory bounded, in which case
 sets up a limit for Spark. It is still in the incubating group and the
 "stats" functions spark has already implemented are still on the roadmap. I
 am not sure whether it will be good but might be something interesting to
 check out.

 /usr/bin


 On Tue, Apr 8, 2014 at 9:57 AM, Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi to everybody,
>
>  in these days I looked a bit at the recent evolution of the big data
> stacks and it seems that HBase is somehow fading away in favour of
> Spark+HDFS. Am I correct?
> Do you think that Spark and HBase should work together or not?
>
> Best regards,
> Flavio
>

>>
>


Re: help

2014-04-25 Thread Joe L
hi thank you for your reply but I could not find it. it says that no such
file or directory

 
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4841p4848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Build times for Spark

2014-04-25 Thread Williams, Ken
I've cloned the github repo and I'm building Spark on a pretty beefy machine 
(24 CPUs, 78GB of RAM) and it takes a pretty long time.

For instance, today I did a 'git pull' for the first time in a week or two, and 
then doing 'sbt/sbt assembly' took 43 minutes of wallclock time (88 minutes of 
CPU time).  After that, I did 'SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true 
sbt/sbt assembly' and that took 25 minutes wallclock, 73 minutes CPU.

Is that typical?  Or does that indicate some setup problem in my environment?

--
Ken Williams, Senior Research Scientist
WindLogics
http://windlogics.com




CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution of any kind is strictly 
prohibited. If you are not the intended recipient, please contact the sender 
via reply e-mail and destroy all copies of the original message. Thank you.


Re: Pig on Spark

2014-04-25 Thread Mayur Rustagi
One core segment that frequently asks for systems like Pig & Hive are
analyst who want to deal with data. The key place I see pig fitting in is
getting non-developers deal with data at scale & free up developers to deal
with code, udf rather than manage day to day dataflow changes & updates.
A byproduct of this is that big data computation is made available to folks
beyond those who know what maven & sbt are :)


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Sat, Apr 26, 2014 at 12:04 AM, Bharath Mundlapudi
wrote:

> >> I've only had a quick look at Pig, but it seems that a declarative
> >> layer on top of Spark couldn't be anything other than a big win, as it
> >> allows developers to declare *what* they want, permitting the compiler
> >> to determine how best poke at the RDD API to implement it.
>
> The devil is in the details - allowing developers to declare *what* they
> want - seems not practical in a declarative world since we are bound by the
> DSL constructs. The work around or rather hack is to have UDFs to have full
> language constructs. Some problems are hard, you will have twist your mind
> to solve in a restrictive way. At that time, we think, we wish we have
> complete language power.
>
> Being in Big Data world for short time (7 years), seen enough problems
> with Hive/Pig. All I am providing here is a thought to spark the Spark
> community to think beyond declarative constructs.
>
> I am sure there is a place for Pig and Hive.
>
> -Bharath
>
>
>
>
> On Fri, Apr 25, 2014 at 10:21 AM, Michael Armbrust  > wrote:
>
>> On Fri, Apr 25, 2014 at 6:30 AM, Mark Baker  wrote:
>>
>>> I've only had a quick look at Pig, but it seems that a declarative
>>> layer on top of Spark couldn't be anything other than a big win, as it
>>> allows developers to declare *what* they want, permitting the compiler
>>> to determine how best poke at the RDD API to implement it.
>>>
>>
>> Having Pig too would certainly be a win, but Spark 
>> SQLis
>>  also a declarative layer on top of Spark.  Since the optimization is
>> lazy, you can chain multiple SQL statements in a row and still optimize
>> them holistically (similar to a pig job).  Alpha version coming soon to a
>> Spark 1.0 release near you!
>>
>> Spark SQL also lets to drop back into functional Scala when that is more
>> natural for a particular task.
>>
>
>


Re: Build times for Spark

2014-04-25 Thread DB Tsai
Are you using SSD? We found that the bottleneck is not computational, but
disk IO. When assembly, sbt is moving lots of class files, jars, and
packaging them into a single flat jar. I can do assembly in my macbook in
10mins while before upgrading to SSD, it took 30~40mins.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Fri, Apr 25, 2014 at 12:53 PM, Williams, Ken  wrote:

>  I’ve cloned the github repo and I’m building Spark on a pretty beefy
> machine (24 CPUs, 78GB of RAM) and it takes a pretty long time.
>
>
>
> For instance, today I did a ‘git pull’ for the first time in a week or
> two, and then doing ‘sbt/sbt assembly’ took 43 minutes of wallclock time
> (88 minutes of CPU time).  After that, I did ‘SPARK_HADOOP_VERSION=2.2.0
> SPARK_YARN=true sbt/sbt assembly’ and that took 25 minutes wallclock, 73
> minutes CPU.
>
>
>
> Is that typical?  Or does that indicate some setup problem in my
> environment?
>
>
>
> --
>
> Ken Williams, Senior Research Scientist
>
> *Wind**Logics*
>
> http://windlogics.com
>
>
>
> --
>
> CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized review, use, disclosure or distribution of
> any kind is strictly prohibited. If you are not the intended recipient,
> please contact the sender via reply e-mail and destroy all copies of the
> original message. Thank you.
>


Re: Build times for Spark

2014-04-25 Thread Josh Rosen
Did you configure SBT to use the extra memory?


On Fri, Apr 25, 2014 at 12:53 PM, Williams, Ken  wrote:

>  I’ve cloned the github repo and I’m building Spark on a pretty beefy
> machine (24 CPUs, 78GB of RAM) and it takes a pretty long time.
>
>
>
> For instance, today I did a ‘git pull’ for the first time in a week or
> two, and then doing ‘sbt/sbt assembly’ took 43 minutes of wallclock time
> (88 minutes of CPU time).  After that, I did ‘SPARK_HADOOP_VERSION=2.2.0
> SPARK_YARN=true sbt/sbt assembly’ and that took 25 minutes wallclock, 73
> minutes CPU.
>
>
>
> Is that typical?  Or does that indicate some setup problem in my
> environment?
>
>
>
> --
>
> Ken Williams, Senior Research Scientist
>
> *Wind**Logics*
>
> http://windlogics.com
>
>
>
> --
>
> CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized review, use, disclosure or distribution of
> any kind is strictly prohibited. If you are not the intended recipient,
> please contact the sender via reply e-mail and destroy all copies of the
> original message. Thank you.
>


Scala Spark / Shark: How to access existing Hive tables in Hortonworks?

2014-04-25 Thread Darq Moth
I am trying to find some docs / description of the approach on the subject,
please help. I have Hadoop 2.2.0 from Hortonworks installed with some
existing Hive tables I need to query. Hive SQL works extremly and
unreasonably slow on single node and cluster as well. I hope Shark will
work faster.

>From Spark/Shark docs I can not figure out how to make Shark work with
existing Hive tables. Any ideas how to achieve this? Thanks!


Re: Scala Spark / Shark: How to access existing Hive tables in Hortonworks?

2014-04-25 Thread Mayur Rustagi
You have to configure shark to access the Hortonworks hive metastore
(hcatalog?) & you will start seeing the tables in shark shell & can run
queries like normal & shark will leverage spark for processing your queries.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Sat, Apr 26, 2014 at 2:00 AM, Darq Moth  wrote:

> I am trying to find some docs / description of the approach on the
> subject, please help. I have Hadoop 2.2.0 from Hortonworks installed with
> some existing Hive tables I need to query. Hive SQL works extremly and
> unreasonably slow on single node and cluster as well. I hope Shark will
> work faster.
>
> From Spark/Shark docs I can not figure out how to make Shark work with
> existing Hive tables. Any ideas how to achieve this? Thanks!
>


RE: Build times for Spark

2014-04-25 Thread Williams, Ken
No, I haven’t done any config for SBT.  Is there somewhere you might be able to 
point me toward for how to do that?

-Ken

From: Josh Rosen [mailto:rosenvi...@gmail.com]
Sent: Friday, April 25, 2014 3:27 PM
To: user@spark.apache.org
Subject: Re: Build times for Spark

Did you configure SBT to use the extra memory?

On Fri, Apr 25, 2014 at 12:53 PM, Williams, Ken 
mailto:ken.willi...@windlogics.com>> wrote:
I’ve cloned the github repo and I’m building Spark on a pretty beefy machine 
(24 CPUs, 78GB of RAM) and it takes a pretty long time.

For instance, today I did a ‘git pull’ for the first time in a week or two, and 
then doing ‘sbt/sbt assembly’ took 43 minutes of wallclock time (88 minutes of 
CPU time).  After that, I did ‘SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true 
sbt/sbt assembly’ and that took 25 minutes wallclock, 73 minutes CPU.

Is that typical?  Or does that indicate some setup problem in my environment?

--
Ken Williams, Senior Research Scientist
WindLogics
http://windlogics.com




CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution of any kind is strictly 
prohibited. If you are not the intended recipient, please contact the sender 
via reply e-mail and destroy all copies of the original message. Thank you.



Re: Build times for Spark

2014-04-25 Thread Akhil Das
You can always increase the sbt memory by setting

export JAVA_OPTS="-Xmx10g"





Thanks
Best Regards


On Sat, Apr 26, 2014 at 2:17 AM, Williams, Ken
wrote:

>  No, I haven't done any config for SBT.  Is there somewhere you might be
> able to point me toward for how to do that?
>
>
>
> -Ken
>
>
>
> *From:* Josh Rosen [mailto:rosenvi...@gmail.com]
> *Sent:* Friday, April 25, 2014 3:27 PM
> *To:* user@spark.apache.org
> *Subject:* Re: Build times for Spark
>
>
>
> Did you configure SBT to use the extra memory?
>
>
>
> On Fri, Apr 25, 2014 at 12:53 PM, Williams, Ken <
> ken.willi...@windlogics.com> wrote:
>
> I've cloned the github repo and I'm building Spark on a pretty beefy
> machine (24 CPUs, 78GB of RAM) and it takes a pretty long time.
>
>
>
> For instance, today I did a 'git pull' for the first time in a week or
> two, and then doing 'sbt/sbt assembly' took 43 minutes of wallclock time
> (88 minutes of CPU time).  After that, I did 'SPARK_HADOOP_VERSION=2.2.0
> SPARK_YARN=true sbt/sbt assembly' and that took 25 minutes wallclock, 73
> minutes CPU.
>
>
>
> Is that typical?  Or does that indicate some setup problem in my
> environment?
>
>
>
> --
>
> Ken Williams, Senior Research Scientist
>
> *WindLogics*
>
> http://windlogics.com
>
>
>
>
>  --
>
>
> CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized review, use, disclosure or distribution of
> any kind is strictly prohibited. If you are not the intended recipient,
> please contact the sender via reply e-mail and destroy all copies of the
> original message. Thank you.
>
>
>


Re: Scala Spark / Shark: How to access existing Hive tables in Hortonworks?

2014-04-25 Thread Darq Moth
Thanks!
For now I use JDBC from Scala to get data from Hive.  In Hive I have a
simple table with 20 rows in the following format:

user_id, movie_title, rating, date

I do 3 nested select requests:
1) select distinct user_id
 2) for each user_id:
 select distinct movie_title  //select all movies that user saw
3) for each movie_title:
select distinct user_id  //select all user who saw this
movie

On a local Hive table with 20 rows these nested querries work 26 min!

Questions:
1) Will Shark optimize nested select requests or not and just use the same
selects on JDBC?
2) What wire protocol will Shark use to communicate with remote Hive server?


On Sat, Apr 26, 2014 at 12:35 AM, Mayur Rustagi wrote:

> You have to configure shark to access the Hortonworks hive metastore
> (hcatalog?) & you will start seeing the tables in shark shell & can run
> queries like normal & shark will leverage spark for processing your queries.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Sat, Apr 26, 2014 at 2:00 AM, Darq Moth  wrote:
>
>> I am trying to find some docs / description of the approach on the
>> subject, please help. I have Hadoop 2.2.0 from Hortonworks installed with
>> some existing Hive tables I need to query. Hive SQL works extremly and
>> unreasonably slow on single node and cluster as well. I hope Shark will
>> work faster.
>>
>> From Spark/Shark docs I can not figure out how to make Shark work with
>> existing Hive tables. Any ideas how to achieve this? Thanks!
>>
>
>


Re: Securing Spark's Network

2014-04-25 Thread Jacob Eisinger

Howdy Akhil,

Thanks - that did help!  And, it made me think about how the EC2 scripts
work [1] to set up security.  From my understanding of EC2 security groups
[2], this just sets up external access, right?  (This has no effect on
internal communication between the instances, right?)

I am still confused as to why I am seeing the workers open up new ports for
each job.

Jacob

[1] https://github.com/apache/spark/blob/master/ec2/spark_ec2.py#L230
[2]
http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-network-security.html#default-security-group

Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075



From:   Akhil Das 
To: user@spark.apache.org
Date:   04/25/2014 12:51 PM
Subject:Re: Securing Spark's Network
Sent by:ak...@mobipulse.in



Hi Jacob,

This post might give you a brief idea about the ports being used

https://groups.google.com/forum/#!topic/spark-users/PN0WoJiB0TA





On Fri, Apr 25, 2014 at 8:53 PM, Jacob Eisinger  wrote:
  Howdy,

  We tried running Spark 0.9.1 stand-alone inside docker containers
  distributed over multiple hosts. This is complicated due to Spark opening
  up ephemeral / dynamic ports for the workers and the CLI.  To ensure our
  docker solution doesn't break Spark in unexpected ways and maintains a
  secure cluster, I am interested in understanding more about Spark's
  network architecture. I'd appreciate it if you could you point us to any
  documentation!

  A couple specific questions:
 1. What are these ports being used for?
Checking out the code / experiments, it looks like asynchronous
communication for shuffling around results. Anything else?
 2. How do you secure the network?
Network administrators tend to secure and monitor the network at
the port level. If these ports are dynamic and open randomly,
firewalls are not easily configured and security alarms are raised.
Is there a way to limit the range easily? (We did investigate
setting the kernel parameter ip_local_reserved_ports, but this is
broken [1] on some versions of Linux's cgroups.)

  Thanks,
  Jacob

  [1] https://github.com/lxc/lxc/issues/97

  Jacob D. Eisinger
  IBM Emerging Technologies
  jeis...@us.ibm.com - (512) 286-6075


Re: Build times for Spark

2014-04-25 Thread Shivaram Venkataraman
Are you by any chance building this on NFS ? As far as I know the build is
severely bottlenecked by filesystem calls during assembly (each class file
in each dependency gets a fstat call or something like that).  That is
partly why building from say a local ext4 filesystem or a SSD is much
faster irrespective of memory / CPU.

Thanks
Shivaram


On Fri, Apr 25, 2014 at 2:09 PM, Akhil Das wrote:

> You can always increase the sbt memory by setting
>
> export JAVA_OPTS="-Xmx10g"
>
>
>
>
> Thanks
> Best Regards
>
>
> On Sat, Apr 26, 2014 at 2:17 AM, Williams, Ken <
> ken.willi...@windlogics.com> wrote:
>
>>  No, I haven't done any config for SBT.  Is there somewhere you might be
>> able to point me toward for how to do that?
>>
>>
>>
>> -Ken
>>
>>
>>
>> *From:* Josh Rosen [mailto:rosenvi...@gmail.com]
>> *Sent:* Friday, April 25, 2014 3:27 PM
>> *To:* user@spark.apache.org
>> *Subject:* Re: Build times for Spark
>>
>>
>>
>> Did you configure SBT to use the extra memory?
>>
>>
>>
>> On Fri, Apr 25, 2014 at 12:53 PM, Williams, Ken <
>> ken.willi...@windlogics.com> wrote:
>>
>> I've cloned the github repo and I'm building Spark on a pretty beefy
>> machine (24 CPUs, 78GB of RAM) and it takes a pretty long time.
>>
>>
>>
>> For instance, today I did a 'git pull' for the first time in a week or
>> two, and then doing 'sbt/sbt assembly' took 43 minutes of wallclock time
>> (88 minutes of CPU time).  After that, I did 'SPARK_HADOOP_VERSION=2.2.0
>> SPARK_YARN=true sbt/sbt assembly' and that took 25 minutes wallclock, 73
>> minutes CPU.
>>
>>
>>
>> Is that typical?  Or does that indicate some setup problem in my
>> environment?
>>
>>
>>
>> --
>>
>> Ken Williams, Senior Research Scientist
>>
>> *WindLogics*
>>
>> http://windlogics.com
>>
>>
>>
>>
>>  --
>>
>>
>> CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the
>> intended recipient(s) and may contain confidential and privileged
>> information. Any unauthorized review, use, disclosure or distribution of
>> any kind is strictly prohibited. If you are not the intended recipient,
>> please contact the sender via reply e-mail and destroy all copies of the
>> original message. Thank you.
>>
>>
>>
>
>


RE: Build times for Spark

2014-04-25 Thread Williams, Ken
I am indeed, but it's a pretty fast NFS.  I don't have any SSD I can use, but I 
could try to use local disk to see what happens.

For me, a large portion of the time seems to be spent on lines like "Resolving 
org.fusesource.jansi#jansi;1.4 ..." or similar .  Is this going out to find 
Maven resources?  Any way to tell it to just use my local ~/.m2 repository 
instead when the resource already exists there?  Sometimes I even get sporadic 
errors like this:

  [info] Resolving org.apache.hadoop#hadoop-yarn;2.2.0 ...
  [error] SERVER ERROR: Bad Gateway 
url=http://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-yarn-server/2.2.0/hadoop-yarn-server-2.2.0.jar


-Ken

From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
Sent: Friday, April 25, 2014 4:31 PM
To: user@spark.apache.org
Subject: Re: Build times for Spark

Are you by any chance building this on NFS ? As far as I know the build is 
severely bottlenecked by filesystem calls during assembly (each class file in 
each dependency gets a fstat call or something like that).  That is partly why 
building from say a local ext4 filesystem or a SSD is much faster irrespective 
of memory / CPU.

Thanks
Shivaram

On Fri, Apr 25, 2014 at 2:09 PM, Akhil Das 
mailto:ak...@sigmoidanalytics.com>> wrote:
You can always increase the sbt memory by setting

export JAVA_OPTS="-Xmx10g"



Thanks
Best Regards

On Sat, Apr 26, 2014 at 2:17 AM, Williams, Ken 
mailto:ken.willi...@windlogics.com>> wrote:
No, I haven't done any config for SBT.  Is there somewhere you might be able to 
point me toward for how to do that?

-Ken

From: Josh Rosen [mailto:rosenvi...@gmail.com]
Sent: Friday, April 25, 2014 3:27 PM
To: user@spark.apache.org
Subject: Re: Build times for Spark

Did you configure SBT to use the extra memory?

On Fri, Apr 25, 2014 at 12:53 PM, Williams, Ken 
mailto:ken.willi...@windlogics.com>> wrote:
I've cloned the github repo and I'm building Spark on a pretty beefy machine 
(24 CPUs, 78GB of RAM) and it takes a pretty long time.

For instance, today I did a 'git pull' for the first time in a week or two, and 
then doing 'sbt/sbt assembly' took 43 minutes of wallclock time (88 minutes of 
CPU time).  After that, I did 'SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true 
sbt/sbt assembly' and that took 25 minutes wallclock, 73 minutes CPU.

Is that typical?  Or does that indicate some setup problem in my environment?

--
Ken Williams, Senior Research Scientist
WindLogics
http://windlogics.com




CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution of any kind is strictly 
prohibited. If you are not the intended recipient, please contact the sender 
via reply e-mail and destroy all copies of the original message. Thank you.





Re: Build times for Spark

2014-04-25 Thread Shivaram Venkataraman
AFAIK the resolver does pick up things form your local ~/.m2 -- Note that
as ~/.m2 is on NFS that adds to the amount of filesystem traffic.

Shivaram


On Fri, Apr 25, 2014 at 2:57 PM, Williams, Ken
wrote:

>  I am indeed, but it's a pretty fast NFS.  I don't have any SSD I can
> use, but I could try to use local disk to see what happens.
>
>
>
> For me, a large portion of the time seems to be spent on lines like
> "Resolving org.fusesource.jansi#jansi;1.4 ..." or similar .  Is this going
> out to find Maven resources?  Any way to tell it to just use my local ~/.m2
> repository instead when the resource already exists there?  Sometimes I
> even get sporadic errors like this:
>
>
>
>   [info] Resolving org.apache.hadoop#hadoop-yarn;2.2.0 ...
>
>   [error] SERVER ERROR: Bad Gateway url=
> http://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-yarn-server/2.2.0/hadoop-yarn-server-2.2.0.jar
>
>
>
>
>
> -Ken
>
>
>
> *From:* Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
> *Sent:* Friday, April 25, 2014 4:31 PM
>
> *To:* user@spark.apache.org
> *Subject:* Re: Build times for Spark
>
>
>
> Are you by any chance building this on NFS ? As far as I know the build is
> severely bottlenecked by filesystem calls during assembly (each class file
> in each dependency gets a fstat call or something like that).  That is
> partly why building from say a local ext4 filesystem or a SSD is much
> faster irrespective of memory / CPU.
>
>
>
> Thanks
>
> Shivaram
>
>
>
> On Fri, Apr 25, 2014 at 2:09 PM, Akhil Das 
> wrote:
>
> You can always increase the sbt memory by setting
>
> export JAVA_OPTS="-Xmx10g"
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Sat, Apr 26, 2014 at 2:17 AM, Williams, Ken <
> ken.willi...@windlogics.com> wrote:
>
> No, I haven't done any config for SBT.  Is there somewhere you might be
> able to point me toward for how to do that?
>
>
>
> -Ken
>
>
>
> *From:* Josh Rosen [mailto:rosenvi...@gmail.com]
> *Sent:* Friday, April 25, 2014 3:27 PM
> *To:* user@spark.apache.org
> *Subject:* Re: Build times for Spark
>
>
>
> Did you configure SBT to use the extra memory?
>
>
>
> On Fri, Apr 25, 2014 at 12:53 PM, Williams, Ken <
> ken.willi...@windlogics.com> wrote:
>
> I've cloned the github repo and I'm building Spark on a pretty beefy
> machine (24 CPUs, 78GB of RAM) and it takes a pretty long time.
>
>
>
> For instance, today I did a 'git pull' for the first time in a week or
> two, and then doing 'sbt/sbt assembly' took 43 minutes of wallclock time
> (88 minutes of CPU time).  After that, I did 'SPARK_HADOOP_VERSION=2.2.0
> SPARK_YARN=true sbt/sbt assembly' and that took 25 minutes wallclock, 73
> minutes CPU.
>
>
>
> Is that typical?  Or does that indicate some setup problem in my
> environment?
>
>
>
> --
>
> Ken Williams, Senior Research Scientist
>
> *WindLogics*
>
> http://windlogics.com
>
>
>
>
>  --
>
>
> CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized review, use, disclosure or distribution of
> any kind is strictly prohibited. If you are not the intended recipient,
> please contact the sender via reply e-mail and destroy all copies of the
> original message. Thank you.
>
>
>
>
>
>
>


Re: Spark and HBase

2014-04-25 Thread Nicholas Chammas
Josh, is there a specific use pattern you think is served well by Phoenix +
Spark? Just curious.


On Fri, Apr 25, 2014 at 3:17 PM, Josh Mahonin  wrote:

> Phoenix generally presents itself as an endpoint using JDBC, which in my
> testing seems to play nicely using JdbcRDD.
>
> However, a few days ago a patch was made against Phoenix to implement
> support via PIG using a custom Hadoop InputFormat, which means now it has
> Spark support too.
>
> Here's a code snippet that sets up an RDD for a specific query:
>
> --
> val phoenixConf = new PhoenixPigConfiguration(new Configuration())
> phoenixConf.setSelectStatement("SELECT EVENTTYPE,EVENTTIME FROM EVENTS
> WHERE EVENTTYPE = 'some_type')
> phoenixConf.setSelectColumns("EVENTTYPE,EVENTTIME")
> phoenixConf.configure("servername", "EVENTS", 100L)
>
> val phoenixRDD = sc.newAPIHadoopRDD(
> phoenixConf.getConfiguration(),
> classOf[PhoenixInputFormat],
>   classOf[NullWritable],
>   classOf[PhoenixRecord])
> --
>
> I'm still very new at Spark and even less experienced with Phoenix, but
> I'm hoping there's an advantage over the JdbcRDD in terms of partitioning.
> The JdbcRDD seems to implement partitioning based on a query predicate that
> is user defined, but I think Phoenix's InputFormat is able to figure out
> the splits which Spark is able to leverage. I don't really know how to
> verify if this is the case or not though, so if anyone else is looking into
> this, I'd love to hear their thoughts.
>
> Josh
>
>
> On Tue, Apr 8, 2014 at 1:00 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Just took a quick look at the overview 
>> here and
>> the quick start guide 
>> here
>> .
>>
>> It looks like Apache Phoenix aims to provide flexible SQL access to data,
>> both for transactional and analytic purposes, and at interactive speeds.
>>
>> Nick
>>
>>
>> On Tue, Apr 8, 2014 at 12:38 PM, Bin Wang  wrote:
>>
>>> First, I have not tried it myself. However, what I have heard it has
>>> some basic SQL features so you can query you HBase table like query content
>>> on HDFS using Hive.
>>> So it is not "query a simple column", I believe you can do joins and
>>> other SQL queries. Maybe you can wrap up an EMR cluster with Hbase
>>> preconfigured and give it a try.
>>>
>>> Sorry cannot provide more detailed explanation and help.
>>>
>>>
>>>
>>> On Tue, Apr 8, 2014 at 10:17 AM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Thanks for the quick reply Bin. Phenix is something I'm going to try
 for sure but is seems somehow useless if I can use Spark.
 Probably, as you said, since Phoenix use a dedicated data structure
 within each HBase Table has a more effective memory usage but if I need to
 deserialize data stored in a HBase cell I still have to read in memory that
 object and thus I need Spark. From what I understood Phoenix is good if I
 have to query a simple column of HBase but things get really complicated if
 I have to add an index for each column in my table and I store complex
 object within the cells. Is it correct?

 Best,
 Flavio




 On Tue, Apr 8, 2014 at 6:05 PM, Bin Wang  wrote:

> Hi Flavio,
>
> I happened to attend, actually attending the 2014 Apache Conf, I heard
> a project called "Apache Phoenix", which fully leverage HBase and suppose
> to be 1000x faster than Hive. And it is not memory bounded, in which case
> sets up a limit for Spark. It is still in the incubating group and the
> "stats" functions spark has already implemented are still on the roadmap. 
> I
> am not sure whether it will be good but might be something interesting to
> check out.
>
> /usr/bin
>
>
> On Tue, Apr 8, 2014 at 9:57 AM, Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hi to everybody,
>>
>>  in these days I looked a bit at the recent evolution of the big
>> data stacks and it seems that HBase is somehow fading away in favour of
>> Spark+HDFS. Am I correct?
>> Do you think that Spark and HBase should work together or not?
>>
>> Best regards,
>> Flavio
>>
>
>>>
>>
>


Re: Strange lookup behavior. Possible bug?

2014-04-25 Thread Yadid Ayzenberg

Some additional information - maybe this rings a bell with someone:

I suspect this happens when the lookup returns more than one value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDD.
When running on local machine - the lookup is successfull. However, 
when running a standalone cluster with the exact same dataset - one of 
the tasks never ends (constantly in RUNNING status).
When viewing the worker log, it seems that the task has finished 
successfully:


14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally
14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 
10896794

14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs indefinitely.

If I execute a count priot to the lookup - I get the correct number 
which suggests that the cluster is operating as expected.


The exact same scenario works with a different type of key (Tuple2): 
JavaPairRDD.


Any ideas on how to debug this problem ?

Thanks,

Yadid





Re: help

2014-04-25 Thread Jey Kottalam
Sorry, but I don't know where Cloudera puts the executor log files.
Maybe their docs give the correct path?

On Fri, Apr 25, 2014 at 12:32 PM, Joe L  wrote:
> hi thank you for your reply but I could not find it. it says that no such
> file or directory
>
>
> 
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4841p4848.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Running out of memory Naive Bayes

2014-04-25 Thread John King
I've been trying to use the Naive Bayes classifier. Each example in the
dataset is about 2 million features, only about 20-50 of which are
non-zero, so the vectors are very sparse. I keep running out of memory
though, even for about 1000 examples on 30gb RAM while the entire dataset
is 4 million examples. And I would also like to note that I'm using the
sparse vector class.


Question about Transforming huge files from Local to HDFS

2014-04-25 Thread PengWeiPRC
Hi there,

I am sorry to bother you, but I encountered a problem about transforming
large files (hundreds of giga per file) in local file system to HDFS as
Parquet file format using Spark. The problem can be described as follows.

1) When I tried to read a huge file from local and used Avro + Parquet to
transform it into Parquet format and stored them to HDFS using the API
"saveAsNewAPIHadoopFile", the JVM would be out of memory, because the file
is too large to be contained by memory.

2) When I tried to read a fraction of them and write to HDFS as Parquet
format using the API "saveAsNewAPIHadoopFile", I found that for each loop,
it would generate a directory with a list of files, namely, it would be
deemed as several independent outputs, which was not what I would like and
would occur some problems when I tried to process them in the future.

So, for a huge file which cannot be entirely hold by memory, are there any
way to transform it into Parquet format and output all the files to HDFS in
the same directory and thus deemed as a unique file?

In addition, could anybody know for Spark how to get the directory structure
of HDFS, or how to read a directory recursively so as to read all the files
in that directory and its sub-directory? That may be also substitution of
this problem.

I wish that some one could help me fix it. I will really appreciate it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-about-Transforming-huge-files-from-Local-to-HDFS-tp4867.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
I've tried to set larger buffer, but reduceByKey seems to be failed. need
help:)

14/04/26 12:31:12 INFO cluster.CoarseGrainedSchedulerBackend: Shutting down
all executors
14/04/26 12:31:12 INFO cluster.CoarseGrainedSchedulerBackend: Asking each
executor to shut down
14/04/26 12:31:12 INFO scheduler.DAGScheduler: Failed to run countByKey at
filter_2.scala:35
14/04/26 12:31:12 INFO yarn.ApplicationMaster: finishApplicationMaster with
FAILED
Exception in thread "Thread-3"
org.apache.hadoop.yarn.exceptions.YarnException: Application doesn't exist
in cache appattempt_1398305021882_0069_01
at 
org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)
at
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.finishApplicationMaster(ApplicationMasterService.java:294)
at
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.finishApplicationMaster(ApplicationMasterProtocolPBServiceImpl.java:75)
at
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:97)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at 
org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:101)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:94)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at $Proxy12.finishApplicationMaster(Unknown Source)
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.unregisterApplicationMaster(AMRMClientImpl.java:311)
at
org.apache.spark.deploy.yarn.ApplicationMaster.finishApplicationMaster(ApplicationMaster.scala:320)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:165)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.yarn.exceptions.YarnException):
Application doesn't exist in cache appattempt_1398305021882_0069_01
at 
org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)
at
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.finishApplicationMaster(ApplicationMasterService.java:294)
at
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.finishApplicationMaster(ApplicationMasterProtocolPBServiceImpl.java:75)
at
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:97)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at $Proxy11.finishApplicationMaster(Unknown Source)
at
o

Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
This error come just because I killed my App:(

Is there something wrong? the reduceByKey operation is extremely slow(than
default Serializer).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
reduceByKey(_+_).countByKey instead of countByKey seems to be fast.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4870.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
parallelize is still so slow. 



package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Map[String,Int]])
kryo.register(classOf[Map[String,Long]])
kryo.register(classOf[Seq[(String,Long)]])
kryo.register(classOf[Seq[(String,Int)]])
}
}

object WFilter2 {
def initspark(name:String) = {
val conf = new SparkConf()
.setMaster("yarn-standalone")
.setAppName(name)
.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(SparkContext.jarOfClass(this.getClass))
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
//.set("spark.closure.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.mb", "256")
.set("spark.kryo.registrator",
"com.semi.nlp.MyRegistrator")
.set("spark.cores.max", "30")
new SparkContext(conf)
}

def main(args: Array[String]) {
val spark = initspark("word filter mapping")
val stopset = spark broadcast
Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
val tf_map = spark broadcast
file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey
val df_map = spark broadcast
file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
< 4 || (stopset.value contains w)) false else true
val mapped =
file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
   
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
spark.stop()
}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.