since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?

2016-03-28 Thread charles li
use case: have a dataset, and want to use different algorithms on that, and
fetch the result.

for making this, I think I should distribute my algorithms, and run these
algorithms on the dataset at the same time, am I right?

but it seems that spark can not parallelize/serialize algorithms/functions,
then how to make it?


*here is the test code*:


def test():
pass
function_list = [test] * 10

sc.parallelize([test] * 10).take(1)



*error message: *
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage
9.0 (TID 105, sh-demo-hadoop-07):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
line 111, in main

process()

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
line 106, in process

serializer.dump_stream(func(split_index, iterator), outfile)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 263, in dump_stream

vs = list(itertools.islice(iterator, batch))

  File "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py",
line 1293, in takeUpToNumLeft

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 139, in load_stream

yield self._read_with_length(stream)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 164, in _read_with_length

return self.loads(obj)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 422, in loads

return pickle.loads(obj)

AttributeError: 'module' object has no attribute 'test'


at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)

at
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)

at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)

at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at org.apache.spark.scheduler.Task.run(Task.scala:89)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


what's interesting is that* when I run sc.parallelize([test] *
10).collect() , it works fine*, returns :

[,

 ,

 ,

 ,

 ,

 ,

 ,

 ,

 ,

 ]




-- 
--
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?

2016-03-28 Thread Holden Karau
You probably want to look at the map transformation, and the many more
defined on RDDs. The function you pass in to map is serialized and the
computation is distributed.

On Monday, March 28, 2016, charles li  wrote:

>
> use case: have a dataset, and want to use different algorithms on that,
> and fetch the result.
>
> for making this, I think I should distribute my algorithms, and run these
> algorithms on the dataset at the same time, am I right?
>
> but it seems that spark can not parallelize/serialize
> algorithms/functions, then how to make it?
>
>
> *here is the test code*:
>
>
> 
> def test():
> pass
> function_list = [test] * 10
>
> sc.parallelize([test] * 10).take(1)
>
> 
>
>
> *error message: *
> Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage
> 9.0 (TID 105, sh-demo-hadoop-07):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>
>   File
> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
> line 111, in main
>
> process()
>
>   File
> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
> line 106, in process
>
> serializer.dump_stream(func(split_index, iterator), outfile)
>
>   File
> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
> line 263, in dump_stream
>
> vs = list(itertools.islice(iterator, batch))
>
>   File
> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line
> 1293, in takeUpToNumLeft
>
>   File
> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
> line 139, in load_stream
>
> yield self._read_with_length(stream)
>
>   File
> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
>
> return self.loads(obj)
>
>   File
> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
> line 422, in loads
>
> return pickle.loads(obj)
>
> AttributeError: 'module' object has no attribute 'test'
>
>
> at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>
> at
> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
>
> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
> what's interesting is that* when I run sc.parallelize([test] *
> 10).collect() , it works fine*, returns :
>
> [,
>
>  ,
>
>  ,
>
>  ,
>
>  ,
>
>  ,
>
>  ,
>
>  ,
>
>  ,
>
>  ]
>
>
>
>
> --
> --
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Reading Back a Cached RDD

2016-03-28 Thread aka.fe2s
Nick, what is your use-case?


On Thu, Mar 24, 2016 at 11:55 PM, Marco Colombo  wrote:

> You can persist off-heap, for example with tachyon, now called Alluxio.
> Take a look at off heap peristance
>
> Regards
>
>
> Il giovedì 24 marzo 2016, Holden Karau  ha scritto:
>
>> Even checkpoint() is maybe not exactly what you want, since if reference
>> tracking is turned on it will get cleaned up once the original RDD is out
>> of scope and GC is triggered.
>> If you want to share persisted RDDs right now one way to do this is
>> sharing the same spark context (using something like the spark job server
>> or IBM Spark Kernel).
>>
>> On Thu, Mar 24, 2016 at 11:28 AM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> Isn’t persist() only for reusing an RDD within an active application?
>>> Maybe checkpoint() is what you’re looking for instead?
>>> ​
>>>
>>> On Thu, Mar 24, 2016 at 2:02 PM Afshartous, Nick <
>>> nafshart...@turbine.com> wrote:
>>>

 Hi,


 After calling RDD.persist(), is then possible to come back later and
 access the persisted RDD.

 Let's say for instance coming back and starting a new Spark shell
 session.  How would one access the persisted RDD in the new shell session ?


 Thanks,

 --

Nick

>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
> --
> Ing. Marco Colombo
>



-- 
--
Oleksiy Dyagilev


StackOverflow in updateStateByKey

2016-03-28 Thread Vikash Pareek
Hi,

In my use case I need to maintain history data for a key. For this I am
using updateStateByKey in which state is maintained as mutable scala
collection(ArrayBuffer). Each element in ArrayBuffer is an incoming record.
Spark version is 1.6

As number of elements(records) increases in the ArrayBuffer for a key I am
getting StackOverflow error.
16/03/28 07:31:55 ERROR scheduler.JobScheduler: Error running job streaming
job 1459150304000 ms.2
java.lang.StackOverflowError
at
scala.collection.immutable.StringOps.stripSuffix(StringOps.scala:31)
at org.apache.spark.Logging$class.logName(Logging.scala:44)
at org.apache.spark.rdd.RDD.logName(RDD.scala:74)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.rdd.RDD.log(RDD.scala:74)
at org.apache.spark.Logging$class.logDebug(Logging.scala:62)
at org.apache.spark.rdd.RDD.logDebug(RDD.scala:74)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getDependencies$1.apply(CoGroupedRDD.scala:104)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getDependencies$1.apply(CoGroupedRDD.scala:99)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.rdd.CoGroupedRDD.getDependencies(CoGroupedRDD.scala:99)
at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:226)
at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:224)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:224)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:117)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:115)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1.apply$mcVI$sp(CoGroupedRDD.scala:115)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
org.apache.spark.rdd.CoGroupedRDD.getPartitions(CoGroupedRDD.scala:113)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:121)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:115)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

Following is the code snippet
 /def updateState(rows: Seq[ArrayBuffer[Row]], state:
Option[ArrayBuffer[Row]]) = {

  val prevState = state.getOrElse[ArrayBuffer[Row]](ArrayBuffer[Row]())

  val newState = ArrayBuffer.empty[Row]
  newState ++= prevState
  for (r <- rows) {
newState += r(0)
  }
  Some(newState)
}

val pairedFaultStream = getPairedStream(faultStream, sqlContext)
val workingStream =
pairedFaultStream.updateStateByKey[ArrayBuffer[Row]](updateState
_).map(_._2)/

I have tried following approaches
1. truncating lineage by caching and checkpointing rdd of *workingStream*.
2. using kryo serialization

Any suggestion will be appreciated.

- Thanks
Vikash 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StackOverflow-in-updateStateByK

Re: Custom RDD in spark, cannot find custom method

2016-03-28 Thread Ted Yu
You can run Zinc to speed up the build of Spark. 

Cheers

> On Mar 27, 2016, at 10:15 PM, Tenghuan He  wrote:
> 
> Hi Ted
> I changed
> def customable(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
> to
> def customable(partitioner: Partitioner): MyRDD[K, V] = self.withScope {
> 
> after rebuilding the whole spark project(Since it takes long time, I didn't 
> do as you told at first), it also works.
> Thnaks
> 
>> On Mon, Mar 28, 2016 at 11:01 AM, Tenghuan He  wrote:
>> Thanks very much Ted
>> 
>> I added MyRDD.scala to the spark source code and rebuilt the whole spark 
>> project, using myrdd.asInstanceOf[MyRDD] doesn't work. It seems that MyRDD 
>> is not exposed to the spark-shell.
>> 
>> Finally I write a seperate spark application and add the MyRDD.scala to the 
>> project then the custom method can be called in the main function and it 
>> works.
>> I misunderstand the usage of custom rdd, the custom rdd does not have to be 
>> written to the spark project like UnionRDD, CogroupedRDD, and just add it to 
>> your own project.
>> 
>>> On Mon, Mar 28, 2016 at 4:28 AM, Ted Yu  wrote:
>>> My interpretation is that variable myrdd is of type RDD to REPL, though it 
>>> was an instance of MyRDD.
>>> 
>>> Using asInstanceOf in spark-shell should allow you to call your custom 
>>> method.
>>> 
>>> Here is declaration of RDD:
>>> 
>>> abstract class RDD[T: ClassTag](
>>> 
>>> You can extend RDD and include your custom logic in the subclass.
>>> 
 On Sun, Mar 27, 2016 at 10:14 AM, Tenghuan He  wrote:
 ​Thanks Ted,
 
 but I have a doubt that as the code ​above (line 4) in the spark-shell 
 shows myrdd is already a MyRDD, does that not make sense?
 
 1 scala> val part = new org.apache.spark.HashPartitioner(10)
 2 scala> val baseRDD = sc.parallelize(1 to 10).map(x => (x, 
 "hello")).partitionBy(part).cache()
 3 scala> val myrdd = baseRDD.customable(part)  // here customable is a 
 method added to the abstract RDD to create MyRDD
 4 myrdd: org.apache.spark.rdd.RDD[(Int, String)] = MyRDD[3] at customable 
 at
 5 :28
 6 scala> myrdd.customMethod(bulk)
 7 error: value customMethod is not a member of 
 org.apache.spark.rdd.RDD[(Int, String)]
 
> On Mon, Mar 28, 2016 at 12:50 AM, Ted Yu  wrote:
> bq.   def customable(partitioner: Partitioner): RDD[(K, V)] = 
> self.withScope {
> 
> In above, you declare return type as RDD. While you actually intended to 
> declare MyRDD as the return type.
> Or, you can cast myrdd as MyRDD in spark-shell.
> 
> BTW I don't think it is good practice to add custom method to base RDD.
> 
>> On Sun, Mar 27, 2016 at 9:44 AM, Tenghuan He  
>> wrote:
>> Hi Ted,
>> 
>> The codes are running in spark-shell
>> 
>> scala> val part = new org.apache.spark.HashPartitioner(10)
>> scala> val baseRDD = sc.parallelize(1 to 10).map(x => (x, 
>> "hello")).partitionBy(part).cache()
>> scala> val myrdd = baseRDD.customable(part)  // here customable is a 
>> method added to the abstract RDD to create MyRDD
>> myrdd: org.apache.spark.rdd.RDD[(Int, String)] = MyRDD[3] at customable 
>> at
>> :28
>> scala> myrdd.customMethod(bulk)
>> error: value customMethod is not a member of 
>> org.apache.spark.rdd.RDD[(Int, String)]
>> 
>> and the customable method in PairRDDFunctions.scala is 
>> 
>>   def customable(partitioner: Partitioner): RDD[(K, V)] = self.withScope 
>> {
>> new MyRDD[K, V](self, partitioner)
>>   }
>> 
>> Thanks:)
>> 
>>> On Mon, Mar 28, 2016 at 12:28 AM, Ted Yu  wrote:
>>> Can you show the full stack trace (or top 10 lines) and the snippet 
>>> using your MyRDD ?
>>> 
>>> Thanks
>>> 
 On Sun, Mar 27, 2016 at 9:22 AM, Tenghuan He  
 wrote:
 ​Hi everyone,
 
 I am creating a custom RDD which extends RDD and add a custom 
 method, however the custom method cannot be found.
 The custom RDD looks like the following:
 
 class MyRDD[K, V](
 var base: RDD[(K, V)],
 part: Partitioner
   ) extends RDD[(K, V)](base.context, Nil) {
 
   def customMethod(bulk: ArrayBuffer[(K, (V, Int))]): myRDD[K, V] = {
   // ... custom code here
   }
 
   override def compute(split: Partition, context: TaskContext): 
 Iterator[(K, V)] = {
   // ... custome code here
   }
 
   override protected def getPartitions: Array[Partition] = {
   // ... custom code here
   }
 
   override protected def getDependencies: Seq[Dependency[_]] = {
   // ... custom code here
   }
 }​
 
 In spark-shell, it turns out that the overrided methods works well, 
 but when calling myrdd.customMethod(bulk), it throws 

Re: Unit testing framework for Spark Jobs?

2016-03-28 Thread Steve Loughran
this is a good summary -Have you thought of publishing it at the end of a URL 
for others to refer to

> On 18 Mar 2016, at 07:05, Lars Albertsson  wrote:
> 
> I would recommend against writing unit tests for Spark programs, and
> instead focus on integration tests of jobs or pipelines of several
> jobs. You can still use a unit test framework to execute them. Perhaps
> this is what you meant.
> 
> You can use any of the popular unit test frameworks to drive your
> tests, e.g. JUnit, Scalatest, Specs2. I prefer Scalatest, since it
> gives you choice of TDD vs BDD, and it is also well integrated with
> IntelliJ.
> 
> I would also recommend against using testing frameworks tied to a
> processing technology, such as Spark Testing Base. Although it does
> seem well crafted, and makes it easy to get started with testing,
> there are drawbacks:
> 
> 1. I/O routines are not tested. Bundled test frameworks typically do
> not materialise datasets on storage, but pass them directly in memory.
> (I have not verified this for Spark Testing Base, but it looks so.)
> I/O routines are therefore not exercised, and they often hide bugs,
> e.g. related to serialisation.
> 
> 2. You create a strong coupling between processing technology and your
> tests. If you decide to change processing technology (which can happen
> soon in this fast paced world...), you need to rewrite your tests.
> Therefore, during a migration process, the tests cannot detect bugs
> introduced in migration, and help you migrate fast.
> 
> I recommend that you instead materialise input datasets on local disk,
> run your Spark job, which writes output datasets to local disk, read
> output from disk, and verify the results. You can still use Spark
> routines to read and write input and output datasets. A Spark context
> is expensive to create, so for speed, I would recommend reusing the
> Spark context between input generation, running the job, and reading
> output.
> 
> This is easy to set up, so you don't need a dedicated framework for
> it. Just put your common boilerplate in a shared test trait or base
> class.
> 
> In the future, when you want to replace your Spark job with something
> shinier, you can still use the old tests, and only replace the part
> that runs your job, giving you some protection from regression bugs.
> 
> 
> Testing Spark Streaming applications is a different beast, and you can
> probably not reuse much from your batch testing.
> 
> For testing streaming applications, I recommend that you run your
> application inside a unit test framework, e.g, Scalatest, and have the
> test setup create a fixture that includes your input and output
> components. For example, if your streaming application consumes from
> Kafka and updates tables in Cassandra, spin up single node instances
> of Kafka and Cassandra on your local machine, and connect your
> application to them. Then feed input to a Kafka topic, and wait for
> the result to appear in Cassandra.
> 
> With this setup, your application still runs in Scalatest, the tests
> run without custom setup in maven/sbt/gradle, and you can easily run
> and debug inside IntelliJ.
> 
> Docker is suitable for spinning up external components. If you use
> Kafka, the Docker image spotify/kafka is useful, since it bundles
> Zookeeper.
> 
> When waiting for output to appear, don't sleep for a long time and
> then check, since it will slow down your tests. Instead enter a loop
> where you poll for the results and sleep for a few milliseconds in
> between, with a long timeout (~30s) before the test fails with a
> timeout.

org.scalatest.concurrent.Eventually is your friend there

  eventually(stdTimeout, stdInterval) {
listRestAPIApplications(connector, webUI, true) should 
contain(expectedAppId)
  }

It has good exponential backoff, for fast initial success without using too 
much CPU later, and is simple to use

If it has weaknesses in my tests, they are 

1. it will retry on all exceptions, rather than assertions. If there's a bug in 
the test code then it manifests as a timeout. ( I think I could play with 
Suite.anExceptionThatShouldCauseAnAbort()) here.
2. it's timeout action is simply to rethrow the fault; I like to exec a closure 
to grab more diagnostics
3. It doesn't support some fail-fast exception which your code can raise to 
indicate that the desired state is never going to be reached, and so the test 
should fail fast. Here a new exception and another entry in 
anExceptionThatShouldCauseAnAbort() may be the answer. I should sit down and 
play with that some more.


> 
> This poll and sleep strategy both makes tests quick in successful
> cases, but still robust to occasional delays. The strategy does not
> work if you want to test for absence, e.g. ensure that a particular
> message if filtered. You can work around it by adding another message
> afterwards and polling for its effect before testing for absence of
> the first. Be aware that messages can be processed out of ord

Unable to Limit UI to localhost interface

2016-03-28 Thread David O'Gwynn
Greetings to all,

I've search around the mailing list, but it would seem that (nearly?)
everyone has the opposite problem as mine. I made a stab at looking in the
source for an answer, but I figured I might as well see if anyone else has
run into the same problem as I.

I'm trying to limit my Master/Worker UI to run only on localhost. As it
stands, I have the following two environment variables set in my
spark-env.sh:

SPARK_LOCAL_IP=127.0.0.1
SPARK_MASTER_IP=127.0.0.1

and my slaves file contains one line: 127.0.0.1

The problem is that when I run "start-all.sh", I can nmap my box's public
interface and get the following:

PORT STATE SERVICE
22/tcp   open  ssh
8080/tcp open  http-proxy
8081/tcp open  blackice-icecap

Furthermore, I can go to my box's public IP at port 8080 in my browser and
get the master node's UI. The UI even reports that the URL/REST URLs to be
127.0.0.1:

Spark Master at spark://127.0.0.1:7077
URL: spark://127.0.0.1:7077
REST URL: spark://127.0.0.1:6066 (cluster mode)

I'd rather not have spark available in any way to the outside world without
an explicit SSH tunnel.

There are variables to do with setting the Web UI port, but I'm not
concerned with the port, only the network interface to which the Web UI
binds.

Any help would be greatly appreciated.


Re: since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?

2016-03-28 Thread Sujit Pal
Hi Charles,

I tried this with dummied out functions which just sum transformations of a
list of integers, maybe they could be replaced by algorithms in your case.
The idea is to call them through a "god" function that takes an additional
type parameter and delegates out to the appropriate function. Here's my
code, maybe it helps...

def f0(xs):
>   return len(xs)
> def f1(xs):
>   return sum(xs)
> def f2(xs):
>   return sum([x**2 for x in xs])
> def f_god(n, xs):
>   if n == 1:
> return f1(xs)
>   elif n == 2:
> return f2(xs)
>   else:
> return f0(xs)
>
> xs = [x for x in range(0, 5)]
> xs_b = sc.broadcast(xs)
> ns = sc.parallelize([x for x in range(0, 3)])
> results = ns.map(lambda n: f_god(n, xs_b.value))
> print results.take(10)


gives me:

[5, 10, 30]
-sujit


On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau  wrote:

> You probably want to look at the map transformation, and the many more
> defined on RDDs. The function you pass in to map is serialized and the
> computation is distributed.
>
>
> On Monday, March 28, 2016, charles li  wrote:
>
>>
>> use case: have a dataset, and want to use different algorithms on that,
>> and fetch the result.
>>
>> for making this, I think I should distribute my algorithms, and run these
>> algorithms on the dataset at the same time, am I right?
>>
>> but it seems that spark can not parallelize/serialize
>> algorithms/functions, then how to make it?
>>
>>
>> *here is the test code*:
>>
>>
>> 
>> def test():
>> pass
>> function_list = [test] * 10
>>
>> sc.parallelize([test] * 10).take(1)
>>
>> 
>>
>>
>> *error message: *
>> Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.runJob.
>>
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage
>> 9.0 (TID 105, sh-demo-hadoop-07):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>> line 111, in main
>>
>> process()
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>> line 106, in process
>>
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 263, in dump_stream
>>
>> vs = list(itertools.islice(iterator, batch))
>>
>>   File
>> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line
>> 1293, in takeUpToNumLeft
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 139, in load_stream
>>
>> yield self._read_with_length(stream)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 164, in _read_with_length
>>
>> return self.loads(obj)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 422, in loads
>>
>> return pickle.loads(obj)
>>
>> AttributeError: 'module' object has no attribute 'test'
>>
>>
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>>
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
>>
>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>>
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> what's interesting is that* when I run sc.parallelize([test] *
>> 10).collect() , it works fine*, returns :
>>
>> [,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ,
>>
>>  ]
>>
>>
>>
>>
>> --
>> --
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


Re: --packages configuration equivalent item name?

2016-03-28 Thread Andy Davidson
Hi Russell

I use Jupyter python notebooks a lot. Here is how I start the server

set -x # turn debugging on

#set +x # turn debugging off



# https://github.com/databricks/spark-csv

# http://spark-packages.org/package/datastax/spark-cassandra-connector

#https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_py
thon.md

# 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_pyt
hon.md#pyspark-with-data-frames



# packages are ',' seperate with no white space

extraPkgs="--packages
com.databricks:spark-csv_2.11:1.3.0,datastax:spark-cassandra-connector:1.6.0
-M1-s_2.10"



export PYSPARK_PYTHON=python3

export PYSPARK_DRIVER_PYTHON=python3

IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs --conf
spark.cassandra.connection.host=ec2-54-153-102-232.us-west-1.compute.amazona
ws.com $*



From:  Russell Jurney 
Date:  Sunday, March 27, 2016 at 7:22 PM
To:  "user @spark" 
Subject:  --packages configuration equivalent item name?

> I run PySpark with CSV support like so: IPYTHON=1 pyspark --packages
> com.databricks:spark-csv_2.10:1.4.0
> 
> I don't want to type this --packages argument each time. Is there a config
> item for --packages? I can't find one in the reference at
> http://spark.apache.org/docs/latest/configuration.html
> 
> If there is no way to do this, please let me know so I can make a JIRA for
> this feature.
> 
> Thanks!
> -- 
> Russell Jurney twitter.com/rjurney 
> russell.jur...@gmail.com relato.io 




Aggregate subsequenty x row values together.

2016-03-28 Thread sujeet jog
Hi,

I have a RDD  like this .

[ 12, 45 ]
[ 14, 50 ]
[ 10, 35 ]
[ 11, 50 ]

i want to aggreate values of first two rows into 1 row and subsequenty the
next two rows into another single row...

i don't have a key to aggregate for using some of the aggregate pyspark
functions, how to achieve it ?


Re: Aggregate subsequenty x row values together.

2016-03-28 Thread Alexander Krasnukhin
So, why not make a fake key and aggregate on it?

On Mon, Mar 28, 2016 at 6:21 PM, sujeet jog  wrote:

> Hi,
>
> I have a RDD  like this .
>
> [ 12, 45 ]
> [ 14, 50 ]
> [ 10, 35 ]
> [ 11, 50 ]
>
> i want to aggreate values of first two rows into 1 row and subsequenty the
> next two rows into another single row...
>
> i don't have a key to aggregate for using some of the aggregate pyspark
> functions, how to achieve it ?
>
>
>


-- 
Regards,
Alexander


Re: Aggregate subsequenty x row values together.

2016-03-28 Thread Ted Yu
Can you describe your use case a bit more ?

Since the row keys are not sorted in your example, there is a chance that
you get indeterministic results when you aggregate on groups of two
successive rows.

Thanks

On Mon, Mar 28, 2016 at 9:21 AM, sujeet jog  wrote:

> Hi,
>
> I have a RDD  like this .
>
> [ 12, 45 ]
> [ 14, 50 ]
> [ 10, 35 ]
> [ 11, 50 ]
>
> i want to aggreate values of first two rows into 1 row and subsequenty the
> next two rows into another single row...
>
> i don't have a key to aggregate for using some of the aggregate pyspark
> functions, how to achieve it ?
>
>
>


Re: Aggregate subsequenty x row values together.

2016-03-28 Thread sujeet jog
Hi Ted,

There is no row key persey, and i actually do not want to sort , want to
aggregate the subsequent x rows together as a mean value maintaing the
order of the row entries,

For ex : -
Input rdd
[ 12, 45 ]
[ 14, 50 ]
[ 10, 35 ]
[ 11, 50 ]

expected output rdd ,  the below is actually a aggregation by mean on
subsequent 2 rows each.

[13, 47.5]
[10.5,  42.5]


@ Alexander :   Yes inducing dummy key seems to be one of the ways, ,can
you please post a snippet if possible on how to achieve this...


On Mon, Mar 28, 2016 at 10:30 PM, Ted Yu  wrote:

> Can you describe your use case a bit more ?
>
> Since the row keys are not sorted in your example, there is a chance that
> you get indeterministic results when you aggregate on groups of two
> successive rows.
>
> Thanks
>
> On Mon, Mar 28, 2016 at 9:21 AM, sujeet jog  wrote:
>
>> Hi,
>>
>> I have a RDD  like this .
>>
>> [ 12, 45 ]
>> [ 14, 50 ]
>> [ 10, 35 ]
>> [ 11, 50 ]
>>
>> i want to aggreate values of first two rows into 1 row and subsequenty
>> the next two rows into another single row...
>>
>> i don't have a key to aggregate for using some of the aggregate pyspark
>> functions, how to achieve it ?
>>
>>
>>
>


RE: Non-classification neural networks

2016-03-28 Thread Ulanov, Alexander
Hi Jim,

It is possible to use raw artificial neural networks by means of 
FeedForwardTrainer. It is [ml] package private, so your code should be in that 
package too. 
Basically, you need to do the same as it is done in 
MultilayerPerceptronClassifier but without encoding the output as one-hot: 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala#L168
Here is the sketch of the code if your outputs are continuous and are within 
[0, 1] interval:
val topology = FeedForwardTopology.multiLayerPerceptron(layers, useSoftMax = 
false)
val FeedForwardTrainer = new FeedForwardTrainer(topology, layers(0), 
layers.last)
val mlpModel = FeedForwardTrainer.train(data)

For the outputs in that are in the range of (-infinity, +infinity), the new 
loss function is needed. It is implemented but not merged in Spark yet, 
unfortunately:
https://github.com/avulanov/spark/tree/autoencoder-mlp

Best regards, Alexander

-Original Message-
From: Jim Carroll [mailto:jimfcarr...@gmail.com] 
Sent: Sunday, March 27, 2016 6:25 AM
To: user@spark.apache.org
Subject: Non-classification neural networks

Hello all,

We were using the old "Artificial Neural Network" :
https://github.com/apache/spark/pull/1290

This code appears to have been incorporated in 1.5.2 but it's only exposed 
publicly via the MultilayerPerceptronClassifier. Is there a way to use the old 
feedforward/backprop non-classification functionality? It appears to be buried 
in private classes and it's not obvious to me if the 
MultilayerPerceptronClassifier can be used without the classification. The doc 
says "Number of outputs has to be equal to the total number of labels." 

What if the output is continuous and you want to simply do prediction?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Non-classification-neural-networks-tp26604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray error in nelwy build Hbase

2016-03-28 Thread Ted Yu
Dropping dev@

Can you provide a bit more information ?

release of hbase
release of hadoop

I assume you're running on Linux.
Any change in Linux setup before the exception showed up ?

On Mon, Mar 28, 2016 at 10:30 AM, beeshma r  wrote:

> Hi
> i am  testing with newly build Hbase .Initially  table has been created and
> am able to insert data's in standalone mode.But suddenly i am getting error
> like this below log
>
> http://pastebin.com/e6HW0zbu
>
>
> This is my Hbase-site.xml
> 
> 
> hbase.rootdir
> file:///home/beeshma/Hbase_9556/Build_hbase/root
>   
>   
> hbase.zookeeper.property.dataDir
> /home/beeshma/Hbase_9556/Build_hbase/zk
>   
> 
>
> i havn't change anything with other settings.Any one suggest me that  what
> could be an issue ?
>
>
>
> Cheers
> Beesh
>


NoSuchElementException in ChiSqSelector fit method (version 1.6.0)

2016-03-28 Thread jcason
I'm running into an error that's not making a lot of sense to me, and
couldn't find sufficient info on the web to answer it myself. BTW, you can
reply at Stack Overflow too:
http://stackoverflow.com/questions/36254005/nosuchelementexception-in-chisqselector-fit-method-version-1-6-0

I've written code to generate a list of (String, ArrayBuffer[String]) pairs
and then use HashingTF to convert the features column to vectors (bc it's
for NLP research on parsing where I end up with a whole lot of unique
features; long story). Then I convert the string labels using StringIndexer.
I get the "key not found" error when running ChiSqSelector.fit on the
training data. The stack trace points to a hashmap lookup in ChiSqTest for
labels. This struck me as strange, because I could sort of reason that
perhaps I was using it wrong and had not somehow accounted for unseen labels
-- except this was the fit method on training data. 

Anyway, here's the interesting bit of my code followed by the important part
of the stack trace. Any help would be very much appreciated!!


val parSdp = sc.parallelize(sdp.take(10)) // it dies on a small amount
of data
val insts: RDD[(String, ArrayBuffer[String])] =
parSdp.flatMap(x=> TrainTest.transformGraphSpark(x))

val indexer = new StringIndexer()
.setInputCol("labels")
.setOutputCol("labelIndex")

val instDF = sqlContext.createDataFrame(insts)
.toDF("labels","feats")
val hash = new HashingTF()
.setInputCol("feats")
.setOutputCol("hashedFeats")
.setNumFeatures(100)
val readyDF = hash.transform(indexer
.fit(instDF)
.transform(instDF))

val selector = new ChiSqSelector()
.setNumTopFeatures(100)
.setFeaturesCol("hashedFeats")
.setLabelCol("labelIndex")
.setOutputCol("selectedFeatures")

val Array(training, dev,test) = readyDF.randomSplit(Array(0.8,0.1,0.1),
seed = 12345)

val chisq = selector.fit(training)

And the stack trace:

java.util.NoSuchElementException: key not found: 23.0   

at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at
org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$chiSquaredFeatures$4$$anonfun$apply$4.apply(ChiSqTest.scala:131)
at
org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$chiSquaredFeatures$4$$anonfun$apply$4.apply(ChiSqTest.scala:129)
at
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at
org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$chiSquaredFeatures$4.apply(ChiSqTest.scala:129)
at
org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$chiSquaredFeatures$4.apply(ChiSqTest.scala:125)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.mllib.stat.test.ChiSqTest$.chiSquaredFeatures(ChiSqTest.scala:125)
at
org.apache.spark.mllib.stat.Statistics$.chiSqTest(Statistics.scala:176)
at
org.apache.spark.mllib.feature.ChiSqSelector.fit(ChiSqSelector.scala:193)
at
org.apache.spark.ml.feature.ChiSqSelector.fit(ChiSqSelector.scala:86)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:89)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:122)
... etc etc

I also realized that by changing the size of sdp.take larger (to 100) above
I get a different error:

java.lang.IllegalArgumentException: Chi-squared statistic undefined for
input matrix due to0 sum in column [4].
at
org.apache.spark.mllib.stat.test.ChiSqTest$.chiSquaredMatrix(ChiSqTest.scala:229)
at
org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$chiSquaredFeatures$4.apply(ChiSqTest.scala:134)
at
org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$chiSquaredFeatures$4.apply(ChiSqTest.scala:125)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.

Re: This works to filter transactions older than certain months

2016-03-28 Thread Timur Shenkao
bq. CSV data is stored in an underlying table in Hive (actually created and
populated as an ORC table by Spark)

How is it possible?

On Mon, Mar 28, 2016 at 1:50 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> A while back I was looking for functional programming to filter out
> transactions older > n months etc.
>
> This turned out to be pretty easy.
>
> I get today's day as follows
>
> var today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> '-MM-dd') ").collect.apply(0).getString(0)
>
>
> CSV data is stored in an underlying table in Hive (actually created and
> populated as an ORC table by Spark)
>
> HiveContext.sql("use accounts")
> var n = HiveContext.table("nw_10124772")
>
> scala> n.printSchema
> root
>  |-- transactiondate: date (nullable = true)
>  |-- transactiontype: string (nullable = true)
>  |-- description: string (nullable = true)
>  |-- value: double (nullable = true)
>  |-- balance: double (nullable = true)
>  |-- accountname: string (nullable = true)
>  |-- accountnumber: integer (nullable = true)
>
> //
> // Check for historical transactions > 60 months old
> //
> var old: Int = 60
>
> val rs = n.filter(add_months(col("transactiondate"),old) <
> lit(today)).select(lit(today),
> col("transactiondate"),add_months(col("transactiondate"),old)).collect.foreach(println)
>
> [2016-03-27,2011-03-22,2016-03-22]
> [2016-03-27,2011-03-22,2016-03-22]
> [2016-03-27,2011-03-22,2016-03-22]
> [2016-03-27,2011-03-22,2016-03-22]
> [2016-03-27,2011-03-23,2016-03-23]
> [2016-03-27,2011-03-23,2016-03-23]
>
>
> Which seems to work. Any other suggestions will be appreciated.
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


DataFrame --> JSON objects, instead of un-named array of fields

2016-03-28 Thread Russell Jurney
In PySpark, given a DataFrame, I am attempting to save it as JSON
Lines/ndjson. I run this code:

json_lines = on_time_dataframe.map(lambda x: json.dumps(x))
json_lines.saveAsTextFile('../data/On_Time_On_Time_Performance_2015.jsonl')


This results in simple arrays of fields, instead of JSON objects:

[2015, 1, 1, 1, 4, "2015-01-01", "AA", 19805, "AA", "N787AA", 1, 12478,
1247802, 31703, "JFK", "New York, NY", "NY", 36, "New York", 22, 12892,
1289203, 32575, "LAX", "Los Angeles, CA", "CA", 6, "California", 91, 900,
855, -5.0, 0.0, 0.0, -1, "0900-0959", 17.0, 912, 1230, 7.0, 1230, 1237,
7.0, 7.0, 0.0, 0, "1200-1259", 0.0, "", 0.0, 390.0, 402.0, 378.0, 1.0,
2475.0, 10, null, null, null, null, null, null, null, null, 0, null, null,
null, null, "", null, null, null, null, null, null, "", "", null, null,
null, null, null, null, "", "", null, null, null, null, null, "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]

What I actually want is JSON objects, with a field name for each field:

{"year": "2015", "month": 1, ...}


How can I achieve this in PySpark?

Thanks!
-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


Re: DataFrame --> JSON objects, instead of un-named array of fields

2016-03-28 Thread Russell Jurney
To answer my own question, DataFrame.toJSON() does this, so there is no
need to map and json.dump():

on_time_dataframe.toJSON().saveAsTextFile('../data/On_Time_On_Time_Performance_2015.jsonl')


Thanks!

On Mon, Mar 28, 2016 at 12:54 PM, Russell Jurney 
wrote:

> In PySpark, given a DataFrame, I am attempting to save it as JSON
> Lines/ndjson. I run this code:
>
> json_lines = on_time_dataframe.map(lambda x: json.dumps(x))
> json_lines.saveAsTextFile('../data/On_Time_On_Time_Performance_2015.jsonl')
>
>
> This results in simple arrays of fields, instead of JSON objects:
>
> [2015, 1, 1, 1, 4, "2015-01-01", "AA", 19805, "AA", "N787AA", 1, 12478,
> 1247802, 31703, "JFK", "New York, NY", "NY", 36, "New York", 22, 12892,
> 1289203, 32575, "LAX", "Los Angeles, CA", "CA", 6, "California", 91, 900,
> 855, -5.0, 0.0, 0.0, -1, "0900-0959", 17.0, 912, 1230, 7.0, 1230, 1237,
> 7.0, 7.0, 0.0, 0, "1200-1259", 0.0, "", 0.0, 390.0, 402.0, 378.0, 1.0,
> 2475.0, 10, null, null, null, null, null, null, null, null, 0, null, null,
> null, null, "", null, null, null, null, null, null, "", "", null, null,
> null, null, null, null, "", "", null, null, null, null, null, "", "", "",
> "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
>
> What I actually want is JSON objects, with a field name for each field:
>
> {"year": "2015", "month": 1, ...}
>
>
> How can I achieve this in PySpark?
>
> Thanks!
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>



-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


println not appearing in libraries when running job using spark-submit --master local

2016-03-28 Thread kpeng1
Hi All,

I am currently trying to debug a spark application written in scala.  I have
a main method: 
  def main(args: Array[String]) {
...
 SocialUtil.triggerAndWait(triggerUrl)
...

The SocialUtil object is included in a seperate jar.  I launched the
spark-submit command using --jars passing the SocialUtil jar.  Inside the
triggerAndWait function I have a println statement that is the first thing
in the method, but it doesn't seem to be coming out.  All println that
happen inside the main function directly are appearing though.  I was
wondering if anyone knows what is going on in this situation and how I can
go about making the println in the SocialUtil object appear.

Thanks,

KP




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/println-not-appearing-in-libraries-when-running-job-using-spark-submit-master-local-tp26617.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: println not appearing in libraries when running job using spark-submit --master local

2016-03-28 Thread Ted Yu
Can you describe what gets triggered by triggerAndWait ?

Cheers

On Mon, Mar 28, 2016 at 1:39 PM, kpeng1  wrote:

> Hi All,
>
> I am currently trying to debug a spark application written in scala.  I
> have
> a main method:
>   def main(args: Array[String]) {
> ...
>  SocialUtil.triggerAndWait(triggerUrl)
> ...
>
> The SocialUtil object is included in a seperate jar.  I launched the
> spark-submit command using --jars passing the SocialUtil jar.  Inside the
> triggerAndWait function I have a println statement that is the first thing
> in the method, but it doesn't seem to be coming out.  All println that
> happen inside the main function directly are appearing though.  I was
> wondering if anyone knows what is going on in this situation and how I can
> go about making the println in the SocialUtil object appear.
>
> Thanks,
>
> KP
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/println-not-appearing-in-libraries-when-running-job-using-spark-submit-master-local-tp26617.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: This works to filter transactions older than certain months

2016-03-28 Thread Mich Talebzadeh
Snippet.

import org.apache.spark.sql.functions._
import java.sql.{Date, Timestamp}
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header",
"true").load("hdfs://rhes564:9000/data/stg/accounts/nw/xxx")
case class Accounts( TransactionDate: String, TransactionType: String,
Description: String, Value: Double, Balance: Double, AccountName: String,
AccountNumber : String)
// Map the columns to names
//
val a = df.filter(col("Date") > "").map(p =>
Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
//
// Create a Spark temporary table
//
a.toDF.registerTempTable("tmp")
//
// Need to create and populate target ORC table nw_10124772 in database
accounts in Hive
//
sql("use accounts")
//
// Drop and create table nw_10124772
//
sql("DROP TABLE IF EXISTS accounts.nw_10124772")
var sqltext : String = ""
sqltext = """
CREATE TABLE accounts.nw_10124772 (
TransactionDateDATE
,TransactionType   String
,Description   String
,Value Double
,Balance   Double
,AccountName   String
,AccountNumber Int
)
COMMENT 'from csv file from excel sheet'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
sql(sqltext)
//
// Put data in Hive table. Clean up is already done
//
sqltext = """
INSERT INTO TABLE accounts.nw_10124772
SELECT

TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
AS TransactionDate
, TransactionType
, Description
, Value
, Balance
, AccountName
, AccountNumber
FROM tmp
"""
sql(sqltext)


HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 20:50, Timur Shenkao  wrote:

> bq. CSV data is stored in an underlying table in Hive (actually created
> and populated as an ORC table by Spark)
>
> How is it possible?
>
> On Mon, Mar 28, 2016 at 1:50 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> A while back I was looking for functional programming to filter out
>> transactions older > n months etc.
>>
>> This turned out to be pretty easy.
>>
>> I get today's day as follows
>>
>> var today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
>> '-MM-dd') ").collect.apply(0).getString(0)
>>
>>
>> CSV data is stored in an underlying table in Hive (actually created and
>> populated as an ORC table by Spark)
>>
>> HiveContext.sql("use accounts")
>> var n = HiveContext.table("nw_10124772")
>>
>> scala> n.printSchema
>> root
>>  |-- transactiondate: date (nullable = true)
>>  |-- transactiontype: string (nullable = true)
>>  |-- description: string (nullable = true)
>>  |-- value: double (nullable = true)
>>  |-- balance: double (nullable = true)
>>  |-- accountname: string (nullable = true)
>>  |-- accountnumber: integer (nullable = true)
>>
>> //
>> // Check for historical transactions > 60 months old
>> //
>> var old: Int = 60
>>
>> val rs = n.filter(add_months(col("transactiondate"),old) <
>> lit(today)).select(lit(today),
>> col("transactiondate"),add_months(col("transactiondate"),old)).collect.foreach(println)
>>
>> [2016-03-27,2011-03-22,2016-03-22]
>> [2016-03-27,2011-03-22,2016-03-22]
>> [2016-03-27,2011-03-22,2016-03-22]
>> [2016-03-27,2011-03-22,2016-03-22]
>> [2016-03-27,2011-03-23,2016-03-23]
>> [2016-03-27,2011-03-23,2016-03-23]
>>
>>
>> Which seems to work. Any other suggestions will be appreciated.
>>
>> Thanks
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>


Re: println not appearing in libraries when running job using spark-submit --master local

2016-03-28 Thread Kevin Peng
Ted,

What triggerAndWait does is perform a rest call to a specified url and then
waits until the status message that gets returned by that url in a json a
field says complete.  The issues is I put a println at the very top of the
method and that doesn't get printed out, and I know that println isn't
causing an issues because there is an exception that I throw further down
the line and that exception is what I am currently getting, but none of the
println along the way are showing:


  def triggerAndWait(url: String, pollInterval: Int = 1000 * 30,

timeOut: Int = 1000 * 60 * 60, connectTimeout: Int = 3,

readTimeout: Int = 3, requestMethod: String = "GET"): Boolean = {

println("Entering triggerAndWait function - url: " + url +

  " pollInterval: " + pollInterval.toString() + " timeOut: " +

  timeOut.toString() + " connectionTimeout: " +

  connectTimeout.toString() + " readTimeout: " + readTimeout.toString()
+

  " requestMethod: " + requestMethod)


.


Thanks,


KP

On Mon, Mar 28, 2016 at 1:52 PM, Ted Yu  wrote:

> Can you describe what gets triggered by triggerAndWait ?
>
> Cheers
>
> On Mon, Mar 28, 2016 at 1:39 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am currently trying to debug a spark application written in scala.  I
>> have
>> a main method:
>>   def main(args: Array[String]) {
>> ...
>>  SocialUtil.triggerAndWait(triggerUrl)
>> ...
>>
>> The SocialUtil object is included in a seperate jar.  I launched the
>> spark-submit command using --jars passing the SocialUtil jar.  Inside the
>> triggerAndWait function I have a println statement that is the first thing
>> in the method, but it doesn't seem to be coming out.  All println that
>> happen inside the main function directly are appearing though.  I was
>> wondering if anyone knows what is going on in this situation and how I can
>> go about making the println in the SocialUtil object appear.
>>
>> Thanks,
>>
>> KP
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/println-not-appearing-in-libraries-when-running-job-using-spark-submit-master-local-tp26617.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
Hello,

I found that I could dynamically add/remove new workers to a running
standalone Spark cluster by simply triggering:

start-slave.sh (SPARK_MASTER_ADDR)

and

stop-slave.sh

E.g., I could instantiate a new AWS instance and just add it to a running
cluster without needing to add it to slaves file and restarting the whole
cluster.
It seems that there's no need for me to stop a running cluster.

Is this a valid way of dynamically resizing a spark cluster (as of now, I'm
not concerned about HDFS)? Or will there be certain unforeseen problems if
nodes are added/removed this way?


Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Mich Talebzadeh
Have you added the slave host name to $SPARK_HOME/conf?

Then you can use start-slaves.sh or stop-slaves.sh for all instances

The assumption is that slave boxes have $SPARK_HOME installed in the same
directory as $SPARK_HOME is installed in the master.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 22:06, Sung Hwan Chung  wrote:

> Hello,
>
> I found that I could dynamically add/remove new workers to a running
> standalone Spark cluster by simply triggering:
>
> start-slave.sh (SPARK_MASTER_ADDR)
>
> and
>
> stop-slave.sh
>
> E.g., I could instantiate a new AWS instance and just add it to a running
> cluster without needing to add it to slaves file and restarting the whole
> cluster.
> It seems that there's no need for me to stop a running cluster.
>
> Is this a valid way of dynamically resizing a spark cluster (as of now,
> I'm not concerned about HDFS)? Or will there be certain unforeseen problems
> if nodes are added/removed this way?
>


Re: Databricks fails to read the csv file with blank line at the file header

2016-03-28 Thread Ashok Kumar
Hello Mich
If you accommodate can you please share your approach to steps 1-3 above. 
Best regards 

On Sunday, 27 March 2016, 14:53, Mich Talebzadeh 
 wrote:
 

 Pretty simple as usual it is a combination of ETL and ELT. 
Basically csv files are loaded into staging directory on host, compressed 
before pushing into hdfs
   
   - ETL --> Get rid of the header blank line on the csv files
   - ETL --> Compress the csv files
   - ETL --> Put the compressed CVF files  into hdfs staging directory
   - ELT --> Use databricks to load the csv files
   - ELT --> Spark FP to prcess the csv data
   - ELT --> register it as a temporary table 
   - ELT --> Create an ORC table in a named database in compressed zlib2 format 
in Hive database
   - ELT --> Insert/select from temporary table to Hive table

So the data is stored in an ORC table and one can do whatever analysis using 
Spark, Hive etc


Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 27 March 2016 at 03:05, Koert Kuipers  wrote:

To me this is expected behavior that I would not want fixed, but if you look at 
the recent commits for spark-csv it has one that deals this...On Mar 26, 2016 
21:25, "Mich Talebzadeh"  wrote:


Hi,
I have a standard csv file (saved as csv in HDFS) that has first line of blank 
at the headeras follows
[blank line]
Date, Type, Description, Value, Balance, Account Name, Account Number[blank 
line]22/03/2011,SBT,"'FUNDS TRANSFER , FROM A/C 1790999",200.00,200.00,"'BROWN 
AE","'638585-60125663",

When I read this file using the following standard
val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
it crashes.
java.util.NoSuchElementException
    at java.util.ArrayList$Itr.next(ArrayList.java:794)

 If I go and manually delete the first blank line it works OK
val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
df: org.apache.spark.sql.DataFrame = [Date: string,  Type: string,  
Description: string,  Value: double,  Balance: double,  Account Name: string,  
Account Number: string]

I can easily write a shell script to get rid of blank line. I was wondering if 
databricks does have a flag to get rid of the first blank line in csv file 
format?
P.S. If the file is stored as DOS text file, this problem goes away.
Thanks
Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 




  

Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
No I didn't add it to the conf/slaves file.

What I want to do is leverage auto-scale from AWS, without needing to stop
all the slaves (e.g. if a lot of slaves are idle, terminate those).

Also, the book-keeping is easier if I don't have to deal with some
centralized list of slave list that needs to be modified every time a node
is added/removed.


On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh 
wrote:

> Have you added the slave host name to $SPARK_HOME/conf?
>
> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>
> The assumption is that slave boxes have $SPARK_HOME installed in the same
> directory as $SPARK_HOME is installed in the master.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 22:06, Sung Hwan Chung 
> wrote:
>
>> Hello,
>>
>> I found that I could dynamically add/remove new workers to a running
>> standalone Spark cluster by simply triggering:
>>
>> start-slave.sh (SPARK_MASTER_ADDR)
>>
>> and
>>
>> stop-slave.sh
>>
>> E.g., I could instantiate a new AWS instance and just add it to a running
>> cluster without needing to add it to slaves file and restarting the whole
>> cluster.
>> It seems that there's no need for me to stop a running cluster.
>>
>> Is this a valid way of dynamically resizing a spark cluster (as of now,
>> I'm not concerned about HDFS)? Or will there be certain unforeseen problems
>> if nodes are added/removed this way?
>>
>
>


[Spark SQL] Unexpected Behaviour

2016-03-28 Thread Jerry Lam
Hi spark users and developers,

I'm using spark 1.5.1 (I have no choice because this is what we used). I
ran into some very unexpected behaviour when I did some join operations
lately. I cannot post my actual code here and the following code is not for
practical reasons but it should demonstrate the issue.

val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
99).map((_,1))).toDF("id", "label")
val d1=base.where($"label" === 0)
val d2=base.where($"label" === 1)
d1.join(d2, d1("id") === d2("id"),
"left_outer").drop(d2("label")).select(d1("label"))


The above code will throw an exception saying the column label is not
found. Do you have a reason for throwing an exception when the column has
not been dropped for d1("label")?

Best Regards,

Jerry


Re: Databricks fails to read the csv file with blank line at the file header

2016-03-28 Thread Mich Talebzadeh
Pretty straight forward

#!/bin/ksh
DIR="hdfs://:9000/data/stg/accounts/nw/x"
#
## Remove the blank header line from the spreadsheets and compress them
#
echo `date` " ""===  Started Removing blank header line and Compressing
all csv FILEs"
for FILE in `ls *.csv`
do
  sed '1d' ${FILE} > ${FILE}.tmp
  mv -f ${FILE}.tmp ${FILE}
  /usr/bin/bzip2 ${FILE}
done
#
## Clear out hdfs staging directory
#
echo `date` " ""===  Started deleting old files from hdfs staging
directory ${DIR}"
hdfs dfs -rm -r ${DIR}/*.bz2
echo `date` " ""===  Started Putting bz2 fileS to hdfs staging
directory ${DIR}"
for FILE in `ls *.bz2`
do
  hdfs dfs -copyFromLocal ${FILE} ${DIR}
done
echo `date` " ""===  Checking that all files are moved to hdfs staging
directory"
hdfs dfs -ls ${DIR}
exit 0

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 22:24, Ashok Kumar  wrote:

> Hello Mich
>
> If you accommodate can you please share your approach to steps 1-3 above.
>
> Best regards
>
>
> On Sunday, 27 March 2016, 14:53, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Pretty simple as usual it is a combination of ETL and ELT.
>
> Basically csv files are loaded into staging directory on host, compressed
> before pushing into hdfs
>
>
>1. ETL --> Get rid of the header blank line on the csv files
>2. ETL --> Compress the csv files
>3. ETL --> Put the compressed CVF files  into hdfs staging directory
>4. ELT --> Use databricks to load the csv files
>5. ELT --> Spark FP to prcess the csv data
>6. ELT --> register it as a temporary table
>7. ELT --> Create an ORC table in a named database in compressed zlib2
>format in Hive database
>8. ELT --> Insert/select from temporary table to Hive table
>
>
> So the data is stored in an ORC table and one can do whatever analysis
> using Spark, Hive etc
>
>
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
> http://talebzadehmich.wordpress.com
>
>
> On 27 March 2016 at 03:05, Koert Kuipers  wrote:
>
> To me this is expected behavior that I would not want fixed, but if you
> look at the recent commits for spark-csv it has one that deals this...
> On Mar 26, 2016 21:25, "Mich Talebzadeh" 
> wrote:
>
>
> Hi,
>
> I have a standard csv file (saved as csv in HDFS) that has first line of
> blank at the header
> as follows
>
> [blank line]
> Date, Type, Description, Value, Balance, Account Name, Account Number
> [blank line]
> 22/03/2011,SBT,"'FUNDS TRANSFER , FROM A/C 1790999",200.00,200.00,"'BROWN
> AE","'638585-60125663",
>
> When I read this file using the following standard
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header",
> "true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
>
> it crashes.
>
> java.util.NoSuchElementException
> at java.util.ArrayList$Itr.next(ArrayList.java:794)
>
>  If I go and manually delete the first blank line it works OK
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header",
> "true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
>
> df: org.apache.spark.sql.DataFrame = [Date: string,  Type: string,
> Description: string,  Value: double,  Balance: double,  Account Name:
> string,  Account Number: string]
>
> I can easily write a shell script to get rid of blank line. I was
> wondering if databricks does have a flag to get rid of the first blank line
> in csv file format?
>
> P.S. If the file is stored as DOS text file, this problem goes away.
>
> Thanks
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
> http://talebzadehmich.wordpress.com
>
>
>
>
>
>


Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Mich Talebzadeh
Hi Jerry

What do you expect the outcome to be?

This is Spark 1.6.1

I see this without dropping d2!


scala> d1.join(d2, d1("id") === d2("id"),
"left_outer").select(d1("label")).collect
res15: Array[org.apache.spark.sql.Row] = Array([0], [0], [0], [0], [0],
[0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0],
[0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0],
[0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0])



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 22:34, Jerry Lam  wrote:

> Hi spark users and developers,
>
> I'm using spark 1.5.1 (I have no choice because this is what we used). I
> ran into some very unexpected behaviour when I did some join operations
> lately. I cannot post my actual code here and the following code is not for
> practical reasons but it should demonstrate the issue.
>
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"),
> "left_outer").drop(d2("label")).select(d1("label"))
>
>
> The above code will throw an exception saying the column label is not
> found. Do you have a reason for throwing an exception when the column has
> not been dropped for d1("label")?
>
> Best Regards,
>
> Jerry
>


Re: Databricks fails to read the csv file with blank line at the file header

2016-03-28 Thread Ashok Kumar
Thanks a ton sir. Very helpful 

On Monday, 28 March 2016, 22:36, Mich Talebzadeh 
 wrote:
 

 Pretty straight forward
#!/bin/ksh
DIR="hdfs://:9000/data/stg/accounts/nw/x"
#
## Remove the blank header line from the spreadsheets and compress them
#
echo `date` " ""===  Started Removing blank header line and Compressing all 
csv FILEs"
for FILE in `ls *.csv`
do
  sed '1d' ${FILE} > ${FILE}.tmp
  mv -f ${FILE}.tmp ${FILE}
  /usr/bin/bzip2 ${FILE}
done
#
## Clear out hdfs staging directory
#
echo `date` " ""===  Started deleting old files from hdfs staging directory 
${DIR}"
hdfs dfs -rm -r ${DIR}/*.bz2
echo `date` " ""===  Started Putting bz2 fileS to hdfs staging directory 
${DIR}"
for FILE in `ls *.bz2`
do
  hdfs dfs -copyFromLocal ${FILE} ${DIR}
done
echo `date` " ""===  Checking that all files are moved to hdfs staging 
directory"
hdfs dfs -ls ${DIR}
exit 0HTH

Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 28 March 2016 at 22:24, Ashok Kumar  wrote:

Hello Mich
If you accommodate can you please share your approach to steps 1-3 above. 
Best regards 

On Sunday, 27 March 2016, 14:53, Mich Talebzadeh 
 wrote:
 

 Pretty simple as usual it is a combination of ETL and ELT. 
Basically csv files are loaded into staging directory on host, compressed 
before pushing into hdfs
   
   - ETL --> Get rid of the header blank line on the csv files
   - ETL --> Compress the csv files
   - ETL --> Put the compressed CVF files  into hdfs staging directory
   - ELT --> Use databricks to load the csv files
   - ELT --> Spark FP to prcess the csv data
   - ELT --> register it as a temporary table 
   - ELT --> Create an ORC table in a named database in compressed zlib2 format 
in Hive database
   - ELT --> Insert/select from temporary table to Hive table

So the data is stored in an ORC table and one can do whatever analysis using 
Spark, Hive etc


Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 
On 27 March 2016 at 03:05, Koert Kuipers  wrote:

To me this is expected behavior that I would not want fixed, but if you look at 
the recent commits for spark-csv it has one that deals this...On Mar 26, 2016 
21:25, "Mich Talebzadeh"  wrote:


Hi,
I have a standard csv file (saved as csv in HDFS) that has first line of blank 
at the headeras follows
[blank line]
Date, Type, Description, Value, Balance, Account Name, Account Number[blank 
line]22/03/2011,SBT,"'FUNDS TRANSFER , FROM A/C 1790999",200.00,200.00,"'BROWN 
AE","'638585-60125663",

When I read this file using the following standard
val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
it crashes.
java.util.NoSuchElementException
    at java.util.ArrayList$Itr.next(ArrayList.java:794)

 If I go and manually delete the first blank line it works OK
val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
df: org.apache.spark.sql.DataFrame = [Date: string,  Type: string,  
Description: string,  Value: double,  Balance: double,  Account Name: string,  
Account Number: string]

I can easily write a shell script to get rid of blank line. I was wondering if 
databricks does have a flag to get rid of the first blank line in csv file 
format?
P.S. If the file is stored as DOS text file, this problem goes away.
Thanks
Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 




   



  

Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
It seems that the conf/slaves file is only for consumption by the following
scripts:

sbin/start-slaves.sh
sbin/stop-slaves.sh
sbin/start-all.sh
sbin/stop-all.sh

I.e., conf/slaves file doesn't affect a running cluster.

Is this true?


On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung 
wrote:

> No I didn't add it to the conf/slaves file.
>
> What I want to do is leverage auto-scale from AWS, without needing to stop
> all the slaves (e.g. if a lot of slaves are idle, terminate those).
>
> Also, the book-keeping is easier if I don't have to deal with some
> centralized list of slave list that needs to be modified every time a node
> is added/removed.
>
>
> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Have you added the slave host name to $SPARK_HOME/conf?
>>
>> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>>
>> The assumption is that slave boxes have $SPARK_HOME installed in the same
>> directory as $SPARK_HOME is installed in the master.
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 28 March 2016 at 22:06, Sung Hwan Chung 
>> wrote:
>>
>>> Hello,
>>>
>>> I found that I could dynamically add/remove new workers to a running
>>> standalone Spark cluster by simply triggering:
>>>
>>> start-slave.sh (SPARK_MASTER_ADDR)
>>>
>>> and
>>>
>>> stop-slave.sh
>>>
>>> E.g., I could instantiate a new AWS instance and just add it to a
>>> running cluster without needing to add it to slaves file and restarting the
>>> whole cluster.
>>> It seems that there's no need for me to stop a running cluster.
>>>
>>> Is this a valid way of dynamically resizing a spark cluster (as of now,
>>> I'm not concerned about HDFS)? Or will there be certain unforeseen problems
>>> if nodes are added/removed this way?
>>>
>>
>>
>


Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Alexander Krasnukhin
You drop label column and later you try to select it. It won't find it, indeed.

--
Alexander
aka Six-Hat-Thinker

> On 28 Mar 2016, at 23:34, Jerry Lam  wrote:
> 
> Hi spark users and developers,
> 
> I'm using spark 1.5.1 (I have no choice because this is what we used). I ran 
> into some very unexpected behaviour when I did some join operations lately. I 
> cannot post my actual code here and the following code is not for practical 
> reasons but it should demonstrate the issue.
> 
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to 
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"), 
> "left_outer").drop(d2("label")).select(d1("label"))
> 
> 
> The above code will throw an exception saying the column label is not found. 
> Do you have a reason for throwing an exception when the column has not been 
> dropped for d1("label")?
> 
> Best Regards,
> 
> Jerry 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: This works to filter transactions older than certain months

2016-03-28 Thread Mich Talebzadeh
Forgot to mention

Spark 1.6.1
Hive 2.0

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 21:54, Mich Talebzadeh 
wrote:

> Snippet.
>
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header",
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/xxx")
> case class Accounts( TransactionDate: String, TransactionType: String,
> Description: String, Value: Double, Balance: Double, AccountName: String,
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p =>
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
> //
> // Need to create and populate target ORC table nw_10124772 in database
> accounts in Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDateDATE
> ,TransactionType   String
> ,Description   String
> ,Value Double
> ,Balance   Double
> ,AccountName   String
> ,AccountNumber Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
> AS TransactionDate
> , TransactionType
> , Description
> , Value
> , Balance
> , AccountName
> , AccountNumber
> FROM tmp
> """
> sql(sqltext)
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 20:50, Timur Shenkao  wrote:
>
>> bq. CSV data is stored in an underlying table in Hive (actually created
>> and populated as an ORC table by Spark)
>>
>> How is it possible?
>>
>> On Mon, Mar 28, 2016 at 1:50 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> A while back I was looking for functional programming to filter out
>>> transactions older > n months etc.
>>>
>>> This turned out to be pretty easy.
>>>
>>> I get today's day as follows
>>>
>>> var today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
>>> '-MM-dd') ").collect.apply(0).getString(0)
>>>
>>>
>>> CSV data is stored in an underlying table in Hive (actually created and
>>> populated as an ORC table by Spark)
>>>
>>> HiveContext.sql("use accounts")
>>> var n = HiveContext.table("nw_10124772")
>>>
>>> scala> n.printSchema
>>> root
>>>  |-- transactiondate: date (nullable = true)
>>>  |-- transactiontype: string (nullable = true)
>>>  |-- description: string (nullable = true)
>>>  |-- value: double (nullable = true)
>>>  |-- balance: double (nullable = true)
>>>  |-- accountname: string (nullable = true)
>>>  |-- accountnumber: integer (nullable = true)
>>>
>>> //
>>> // Check for historical transactions > 60 months old
>>> //
>>> var old: Int = 60
>>>
>>> val rs = n.filter(add_months(col("transactiondate"),old) <
>>> lit(today)).select(lit(today),
>>> col("transactiondate"),add_months(col("transactiondate"),old)).collect.foreach(println)
>>>
>>> [2016-03-27,2011-03-22,2016-03-22]
>>> [2016-03-27,2011-03-22,2016-03-22]
>>> [2016-03-27,2011-03-22,2016-03-22]
>>> [2016-03-27,2011-03-22,2016-03-22]
>>> [2016-03-27,2011-03-23,2016-03-23]
>>> [2016-03-27,2011-03-23,2016-03-23]
>>>
>>>
>>> Which seems to work. Any other suggestions will be appreciated.
>>>
>>> Thanks
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>>
>


Re: Databricks fails to read the csv file with blank line at the file header

2016-03-28 Thread Hyukjin Kwon
Could I ask which version are you using?

It looks the cause is the empty line right after header (because that case
is not being checked in tests).

However, for empty lines before the header or inside date, they are being
tested.

https://raw.githubusercontent.com/databricks/spark-csv/master/src/test/resources/ages.csv

https://raw.githubusercontent.com/databricks/spark-csv/master/src/test/resources/cars.csv

So I think it might have to be able to read that case as well and this
might be an issue.

It can be simply done by ETL but I think the behaviour might have to be
consistent.

Maybe would this be better if this issue is open and discussed?
On 29 Mar 2016 6:54 a.m., "Ashok Kumar" 
wrote:

> Thanks a ton sir. Very helpful
>
>
> On Monday, 28 March 2016, 22:36, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Pretty straight forward
>
> #!/bin/ksh
> DIR="hdfs://:9000/data/stg/accounts/nw/x"
> #
> ## Remove the blank header line from the spreadsheets and compress them
> #
> echo `date` " ""===  Started Removing blank header line and
> Compressing all csv FILEs"
> for FILE in `ls *.csv`
> do
>   sed '1d' ${FILE} > ${FILE}.tmp
>   mv -f ${FILE}.tmp ${FILE}
>   /usr/bin/bzip2 ${FILE}
> done
> #
> ## Clear out hdfs staging directory
> #
> echo `date` " ""===  Started deleting old files from hdfs staging
> directory ${DIR}"
> hdfs dfs -rm -r ${DIR}/*.bz2
> echo `date` " ""===  Started Putting bz2 fileS to hdfs staging
> directory ${DIR}"
> for FILE in `ls *.bz2`
> do
>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
> done
> echo `date` " ""===  Checking that all files are moved to hdfs staging
> directory"
> hdfs dfs -ls ${DIR}
> exit 0
> HTH
>
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
> http://talebzadehmich.wordpress.com
>
>
> On 28 March 2016 at 22:24, Ashok Kumar  wrote:
>
> Hello Mich
>
> If you accommodate can you please share your approach to steps 1-3 above.
>
> Best regards
>
>
> On Sunday, 27 March 2016, 14:53, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Pretty simple as usual it is a combination of ETL and ELT.
>
> Basically csv files are loaded into staging directory on host, compressed
> before pushing into hdfs
>
>
>1. ETL --> Get rid of the header blank line on the csv files
>2. ETL --> Compress the csv files
>3. ETL --> Put the compressed CVF files  into hdfs staging directory
>4. ELT --> Use databricks to load the csv files
>5. ELT --> Spark FP to prcess the csv data
>6. ELT --> register it as a temporary table
>7. ELT --> Create an ORC table in a named database in compressed zlib2
>format in Hive database
>8. ELT --> Insert/select from temporary table to Hive table
>
>
> So the data is stored in an ORC table and one can do whatever analysis
> using Spark, Hive etc
>
>
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
> http://talebzadehmich.wordpress.com
>
>
> On 27 March 2016 at 03:05, Koert Kuipers  wrote:
>
> To me this is expected behavior that I would not want fixed, but if you
> look at the recent commits for spark-csv it has one that deals this...
> On Mar 26, 2016 21:25, "Mich Talebzadeh" 
> wrote:
>
>
> Hi,
>
> I have a standard csv file (saved as csv in HDFS) that has first line of
> blank at the header
> as follows
>
> [blank line]
> Date, Type, Description, Value, Balance, Account Name, Account Number
> [blank line]
> 22/03/2011,SBT,"'FUNDS TRANSFER , FROM A/C 1790999",200.00,200.00,"'BROWN
> AE","'638585-60125663",
>
> When I read this file using the following standard
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header",
> "true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
>
> it crashes.
>
> java.util.NoSuchElementException
> at java.util.ArrayList$Itr.next(ArrayList.java:794)
>
>  If I go and manually delete the first blank line it works OK
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header",
> "true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
>
> df: org.apache.spark.sql.DataFrame = [Date: string,  Type: string,
> Description: string,  Value: double,  Balance: double,  Account Name:
> string,  Account Number: string]
>
> I can easily write a shell script to get rid of blank line. I was
> wondering if databricks does have a flag to get rid of the first blank line
> in csv file format?
>
> P.S. If the file is stored as DOS text file, this problem goes away.
>
> Thanks
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> 

Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Mich Talebzadeh
start-all start the master and anything else in slaves file
start-master.sh starts the master only.

I use start-slaves.sh for my purpose with added nodes to slaves file.

When you run start-slave.sh  you are creating another
worker  process on the master host. You can check the status on Spark GUI
on :8080. Depending the ratio of Memory/core for worker process the
additional worker may or may not be used.



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 22:58, Sung Hwan Chung  wrote:

> It seems that the conf/slaves file is only for consumption by the
> following scripts:
>
> sbin/start-slaves.sh
> sbin/stop-slaves.sh
> sbin/start-all.sh
> sbin/stop-all.sh
>
> I.e., conf/slaves file doesn't affect a running cluster.
>
> Is this true?
>
>
> On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung  > wrote:
>
>> No I didn't add it to the conf/slaves file.
>>
>> What I want to do is leverage auto-scale from AWS, without needing to
>> stop all the slaves (e.g. if a lot of slaves are idle, terminate those).
>>
>> Also, the book-keeping is easier if I don't have to deal with some
>> centralized list of slave list that needs to be modified every time a node
>> is added/removed.
>>
>>
>> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Have you added the slave host name to $SPARK_HOME/conf?
>>>
>>> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>>>
>>> The assumption is that slave boxes have $SPARK_HOME installed in the
>>> same directory as $SPARK_HOME is installed in the master.
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 28 March 2016 at 22:06, Sung Hwan Chung 
>>> wrote:
>>>
 Hello,

 I found that I could dynamically add/remove new workers to a running
 standalone Spark cluster by simply triggering:

 start-slave.sh (SPARK_MASTER_ADDR)

 and

 stop-slave.sh

 E.g., I could instantiate a new AWS instance and just add it to a
 running cluster without needing to add it to slaves file and restarting the
 whole cluster.
 It seems that there's no need for me to stop a running cluster.

 Is this a valid way of dynamically resizing a spark cluster (as of now,
 I'm not concerned about HDFS)? Or will there be certain unforeseen problems
 if nodes are added/removed this way?

>>>
>>>
>>
>


Re: Databricks fails to read the csv file with blank line at the file header

2016-03-28 Thread Mich Talebzadeh
Hi,

I am using Spark 1.6.1 with Hive 2.

I agree this may be a case to be resolved. I just happened to work around
it. That first blank line causes

val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header",
"true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")

to crash.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 23:19, Hyukjin Kwon  wrote:

> Could I ask which version are you using?
>
> It looks the cause is the empty line right after header (because that case
> is not being checked in tests).
>
> However, for empty lines before the header or inside date, they are being
> tested.
>
>
> https://raw.githubusercontent.com/databricks/spark-csv/master/src/test/resources/ages.csv
>
>
> https://raw.githubusercontent.com/databricks/spark-csv/master/src/test/resources/cars.csv
>
> So I think it might have to be able to read that case as well and this
> might be an issue.
>
> It can be simply done by ETL but I think the behaviour might have to be
> consistent.
>
> Maybe would this be better if this issue is open and discussed?
> On 29 Mar 2016 6:54 a.m., "Ashok Kumar" 
> wrote:
>
>> Thanks a ton sir. Very helpful
>>
>>
>> On Monday, 28 March 2016, 22:36, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>
>> Pretty straight forward
>>
>> #!/bin/ksh
>> DIR="hdfs://:9000/data/stg/accounts/nw/x"
>> #
>> ## Remove the blank header line from the spreadsheets and compress them
>> #
>> echo `date` " ""===  Started Removing blank header line and
>> Compressing all csv FILEs"
>> for FILE in `ls *.csv`
>> do
>>   sed '1d' ${FILE} > ${FILE}.tmp
>>   mv -f ${FILE}.tmp ${FILE}
>>   /usr/bin/bzip2 ${FILE}
>> done
>> #
>> ## Clear out hdfs staging directory
>> #
>> echo `date` " ""===  Started deleting old files from hdfs staging
>> directory ${DIR}"
>> hdfs dfs -rm -r ${DIR}/*.bz2
>> echo `date` " ""===  Started Putting bz2 fileS to hdfs staging
>> directory ${DIR}"
>> for FILE in `ls *.bz2`
>> do
>>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
>> done
>> echo `date` " ""===  Checking that all files are moved to hdfs
>> staging directory"
>> hdfs dfs -ls ${DIR}
>> exit 0
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> On 28 March 2016 at 22:24, Ashok Kumar  wrote:
>>
>> Hello Mich
>>
>> If you accommodate can you please share your approach to steps 1-3 above.
>>
>> Best regards
>>
>>
>> On Sunday, 27 March 2016, 14:53, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>
>> Pretty simple as usual it is a combination of ETL and ELT.
>>
>> Basically csv files are loaded into staging directory on host, compressed
>> before pushing into hdfs
>>
>>
>>1. ETL --> Get rid of the header blank line on the csv files
>>2. ETL --> Compress the csv files
>>3. ETL --> Put the compressed CVF files  into hdfs staging directory
>>4. ELT --> Use databricks to load the csv files
>>5. ELT --> Spark FP to prcess the csv data
>>6. ELT --> register it as a temporary table
>>7. ELT --> Create an ORC table in a named database in compressed
>>zlib2 format in Hive database
>>8. ELT --> Insert/select from temporary table to Hive table
>>
>>
>> So the data is stored in an ORC table and one can do whatever analysis
>> using Spark, Hive etc
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> On 27 March 2016 at 03:05, Koert Kuipers  wrote:
>>
>> To me this is expected behavior that I would not want fixed, but if you
>> look at the recent commits for spark-csv it has one that deals this...
>> On Mar 26, 2016 21:25, "Mich Talebzadeh" 
>> wrote:
>>
>>
>> Hi,
>>
>> I have a standard csv file (saved as csv in HDFS) that has first line of
>> blank at the header
>> as follows
>>
>> [blank line]
>> Date, Type, Description, Value, Balance, Account Name, Account Number
>> [blank line]
>> 22/03/2011,SBT,"'FUNDS TRANSFER , FROM A/C 1790999",200.00,200.00,"'BROWN
>> AE","'638585-60125663",
>>
>> When I read this file using the following standard
>>
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header",
>> "true").load("hdfs://rhes564:9000/data/stg/accounts/ac/")
>>
>> it crashes.
>>
>> java.util.NoSuchElementException
>> at java.util.ArrayList$Itr.next(ArrayList.java:794)
>>
>>  If I go and manually delete the first blank 

Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Sunitha Kambhampati
Hi Jerry, 

I think you are running into an issue similar to SPARK-14040
https://issues.apache.org/jira/browse/SPARK-14040 
  

One way to resolve it is to use alias.  

Here is an example that I tried on trunk and I do not see any exceptions.  

val d1=base.where($"label" === 0) as("d1")
val d2=base.where($"label" === 1).as("d2")

d1.join(d2, $"d1.id" === $"d2.id", 
"left_outer").drop($"d2.label").select($"d1.label")

Hope this helps some. 

Best regards,
Sunitha.

> On Mar 28, 2016, at 2:34 PM, Jerry Lam  wrote:
> 
> Hi spark users and developers,
> 
> I'm using spark 1.5.1 (I have no choice because this is what we used). I ran 
> into some very unexpected behaviour when I did some join operations lately. I 
> cannot post my actual code here and the following code is not for practical 
> reasons but it should demonstrate the issue.
> 
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to 
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"), 
> "left_outer").drop(d2("label")).select(d1("label"))
> 
> 
> The above code will throw an exception saying the column label is not found. 
> Do you have a reason for throwing an exception when the column has not been 
> dropped for d1("label")?
> 
> Best Regards,
> 
> Jerry 



Re: Unable to Limit UI to localhost interface

2016-03-28 Thread Mich Talebzadeh
in your /etc/hosts what do you have for localhost

127.0.0.1 localhost.localdomain localhost

conf/slave should have one entry in your case

cat slaves
# A Spark Worker will be started on each of the machines listed below.
localhost
...

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 15:32, David O'Gwynn  wrote:

> Greetings to all,
>
> I've search around the mailing list, but it would seem that (nearly?)
> everyone has the opposite problem as mine. I made a stab at looking in the
> source for an answer, but I figured I might as well see if anyone else has
> run into the same problem as I.
>
> I'm trying to limit my Master/Worker UI to run only on localhost. As it
> stands, I have the following two environment variables set in my
> spark-env.sh:
>
> SPARK_LOCAL_IP=127.0.0.1
> SPARK_MASTER_IP=127.0.0.1
>
> and my slaves file contains one line: 127.0.0.1
>
> The problem is that when I run "start-all.sh", I can nmap my box's public
> interface and get the following:
>
> PORT STATE SERVICE
> 22/tcp   open  ssh
> 8080/tcp open  http-proxy
> 8081/tcp open  blackice-icecap
>
> Furthermore, I can go to my box's public IP at port 8080 in my browser and
> get the master node's UI. The UI even reports that the URL/REST URLs to be
> 127.0.0.1:
>
> Spark Master at spark://127.0.0.1:7077
> URL: spark://127.0.0.1:7077
> REST URL: spark://127.0.0.1:6066 (cluster mode)
>
> I'd rather not have spark available in any way to the outside world
> without an explicit SSH tunnel.
>
> There are variables to do with setting the Web UI port, but I'm not
> concerned with the port, only the network interface to which the Web UI
> binds.
>
> Any help would be greatly appreciated.
>
>


Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
Yea, that seems to be the case. It seems that dynamically resizing a
standalone Spark cluster is very simple.

Thanks!

On Mon, Mar 28, 2016 at 10:22 PM, Mich Talebzadeh  wrote:

> start-all start the master and anything else in slaves file
> start-master.sh starts the master only.
>
> I use start-slaves.sh for my purpose with added nodes to slaves file.
>
> When you run start-slave.sh  you are creating another
> worker  process on the master host. You can check the status on Spark GUI
> on :8080. Depending the ratio of Memory/core for worker process the
> additional worker may or may not be used.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 22:58, Sung Hwan Chung 
> wrote:
>
>> It seems that the conf/slaves file is only for consumption by the
>> following scripts:
>>
>> sbin/start-slaves.sh
>> sbin/stop-slaves.sh
>> sbin/start-all.sh
>> sbin/stop-all.sh
>>
>> I.e., conf/slaves file doesn't affect a running cluster.
>>
>> Is this true?
>>
>>
>> On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> No I didn't add it to the conf/slaves file.
>>>
>>> What I want to do is leverage auto-scale from AWS, without needing to
>>> stop all the slaves (e.g. if a lot of slaves are idle, terminate those).
>>>
>>> Also, the book-keeping is easier if I don't have to deal with some
>>> centralized list of slave list that needs to be modified every time a node
>>> is added/removed.
>>>
>>>
>>> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Have you added the slave host name to $SPARK_HOME/conf?

 Then you can use start-slaves.sh or stop-slaves.sh for all instances

 The assumption is that slave boxes have $SPARK_HOME installed in the
 same directory as $SPARK_HOME is installed in the master.

 HTH


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 28 March 2016 at 22:06, Sung Hwan Chung 
 wrote:

> Hello,
>
> I found that I could dynamically add/remove new workers to a running
> standalone Spark cluster by simply triggering:
>
> start-slave.sh (SPARK_MASTER_ADDR)
>
> and
>
> stop-slave.sh
>
> E.g., I could instantiate a new AWS instance and just add it to a
> running cluster without needing to add it to slaves file and restarting 
> the
> whole cluster.
> It seems that there's no need for me to stop a running cluster.
>
> Is this a valid way of dynamically resizing a spark cluster (as of
> now, I'm not concerned about HDFS)? Or will there be certain unforeseen
> problems if nodes are added/removed this way?
>


>>>
>>
>


Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Mich Talebzadeh
The ACID test will come when you start two or more Spark processes
simultaneously. If you see queuing (i.e. second job waiting for the first
job to finish in Spark GUI) then you may not have enough resources for Yarn
to accommodate two jobs despite the additional worker process.



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 28 March 2016 at 23:30, Sung Hwan Chung  wrote:

> Yea, that seems to be the case. It seems that dynamically resizing a
> standalone Spark cluster is very simple.
>
> Thanks!
>
> On Mon, Mar 28, 2016 at 10:22 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> start-all start the master and anything else in slaves file
>> start-master.sh starts the master only.
>>
>> I use start-slaves.sh for my purpose with added nodes to slaves file.
>>
>> When you run start-slave.sh  you are creating another
>> worker  process on the master host. You can check the status on Spark GUI
>> on :8080. Depending the ratio of Memory/core for worker process the
>> additional worker may or may not be used.
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 28 March 2016 at 22:58, Sung Hwan Chung 
>> wrote:
>>
>>> It seems that the conf/slaves file is only for consumption by the
>>> following scripts:
>>>
>>> sbin/start-slaves.sh
>>> sbin/stop-slaves.sh
>>> sbin/start-all.sh
>>> sbin/stop-all.sh
>>>
>>> I.e., conf/slaves file doesn't affect a running cluster.
>>>
>>> Is this true?
>>>
>>>
>>> On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung <
>>> coded...@cs.stanford.edu> wrote:
>>>
 No I didn't add it to the conf/slaves file.

 What I want to do is leverage auto-scale from AWS, without needing to
 stop all the slaves (e.g. if a lot of slaves are idle, terminate those).

 Also, the book-keeping is easier if I don't have to deal with some
 centralized list of slave list that needs to be modified every time a node
 is added/removed.


 On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Have you added the slave host name to $SPARK_HOME/conf?
>
> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>
> The assumption is that slave boxes have $SPARK_HOME installed in the
> same directory as $SPARK_HOME is installed in the master.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 22:06, Sung Hwan Chung 
> wrote:
>
>> Hello,
>>
>> I found that I could dynamically add/remove new workers to a running
>> standalone Spark cluster by simply triggering:
>>
>> start-slave.sh (SPARK_MASTER_ADDR)
>>
>> and
>>
>> stop-slave.sh
>>
>> E.g., I could instantiate a new AWS instance and just add it to a
>> running cluster without needing to add it to slaves file and restarting 
>> the
>> whole cluster.
>> It seems that there's no need for me to stop a running cluster.
>>
>> Is this a valid way of dynamically resizing a spark cluster (as of
>> now, I'm not concerned about HDFS)? Or will there be certain unforeseen
>> problems if nodes are added/removed this way?
>>
>
>

>>>
>>
>


Re: Dynamically adding/removing slaves throuh start-slave.sh and stop-slave.sh

2016-03-28 Thread Sung Hwan Chung
You mean that once a job is in a waiting queue, it won't take advantage of
additional workers that happened to be added after the job was put into the
waiting queue?

That would be less than optimal. But it would be OK with us for now as long
as the additional workers will be taken advantage of by future-submitted
jobs.

On Mon, Mar 28, 2016 at 10:40 PM, Mich Talebzadeh  wrote:

> The ACID test will come when you start two or more Spark processes
> simultaneously. If you see queuing (i.e. second job waiting for the first
> job to finish in Spark GUI) then you may not have enough resources for Yarn
> to accommodate two jobs despite the additional worker process.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 March 2016 at 23:30, Sung Hwan Chung 
> wrote:
>
>> Yea, that seems to be the case. It seems that dynamically resizing a
>> standalone Spark cluster is very simple.
>>
>> Thanks!
>>
>> On Mon, Mar 28, 2016 at 10:22 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> start-all start the master and anything else in slaves file
>>> start-master.sh starts the master only.
>>>
>>> I use start-slaves.sh for my purpose with added nodes to slaves file.
>>>
>>> When you run start-slave.sh  you are creating another
>>> worker  process on the master host. You can check the status on Spark GUI
>>> on :8080. Depending the ratio of Memory/core for worker process the
>>> additional worker may or may not be used.
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 28 March 2016 at 22:58, Sung Hwan Chung 
>>> wrote:
>>>
 It seems that the conf/slaves file is only for consumption by the
 following scripts:

 sbin/start-slaves.sh
 sbin/stop-slaves.sh
 sbin/start-all.sh
 sbin/stop-all.sh

 I.e., conf/slaves file doesn't affect a running cluster.

 Is this true?


 On Mon, Mar 28, 2016 at 9:31 PM, Sung Hwan Chung <
 coded...@cs.stanford.edu> wrote:

> No I didn't add it to the conf/slaves file.
>
> What I want to do is leverage auto-scale from AWS, without needing to
> stop all the slaves (e.g. if a lot of slaves are idle, terminate those).
>
> Also, the book-keeping is easier if I don't have to deal with some
> centralized list of slave list that needs to be modified every time a node
> is added/removed.
>
>
> On Mon, Mar 28, 2016 at 9:20 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Have you added the slave host name to $SPARK_HOME/conf?
>>
>> Then you can use start-slaves.sh or stop-slaves.sh for all instances
>>
>> The assumption is that slave boxes have $SPARK_HOME installed in the
>> same directory as $SPARK_HOME is installed in the master.
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 28 March 2016 at 22:06, Sung Hwan Chung 
>> wrote:
>>
>>> Hello,
>>>
>>> I found that I could dynamically add/remove new workers to a running
>>> standalone Spark cluster by simply triggering:
>>>
>>> start-slave.sh (SPARK_MASTER_ADDR)
>>>
>>> and
>>>
>>> stop-slave.sh
>>>
>>> E.g., I could instantiate a new AWS instance and just add it to a
>>> running cluster without needing to add it to slaves file and restarting 
>>> the
>>> whole cluster.
>>> It seems that there's no need for me to stop a running cluster.
>>>
>>> Is this a valid way of dynamically resizing a spark cluster (as of
>>> now, I'm not concerned about HDFS)? Or will there be certain unforeseen
>>> problems if nodes are added/removed this way?
>>>
>>
>>
>

>>>
>>
>


Strange ML pipeline errors from HashingTF using v1.6.1

2016-03-28 Thread Timothy Potter
I'm seeing the following error when trying to generate a prediction
from a very simple ML pipeline based model. I've verified that the raw
data sent to the tokenizer is valid (not null). It seems like this is
some sort of weird classpath or class loading type issue. Any help you
can provide in trying to troubleshoot this further would be
appreciated.

 Error in machine-learning, docId=20news-18828/alt.atheism/51176
scala.reflect.internal.Symbols$CyclicReference: illegal cyclic
reference involving package 
at scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2768)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$Roots$RootPackage$.(Mirrors.scala:268)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$Roots.RootPackage$lzycompute(Mirrors.scala:267)
~[scala-reflect-2.10.5.jar:?]
at scala.reflect.internal.Mirrors$Roots.RootPackage(Mirrors.scala:267)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$$makeScalaPackage(JavaMirrors.scala:902)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.runtime.JavaMirrors$class.missingHook(JavaMirrors.scala:1299)
~[scala-reflect-2.10.5.jar:?]
at scala.reflect.runtime.JavaUniverse.missingHook(JavaUniverse.scala:12)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.universeMissingHook(Mirrors.scala:77)
~[scala-reflect-2.10.5.jar:?]
at scala.reflect.internal.Mirrors$RootsBase.missingHook(Mirrors.scala:79)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
~[scala-reflect-2.10.5.jar:?]
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
~[scala-reflect-2.10.5.jar:?]
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
~[scala-reflect-2.10.5.jar:?]
at 
org.apache.spark.ml.feature.HashingTF$$typecreator1$1.apply(HashingTF.scala:66)
~[spark-mllib_2.10-1.6.1.jar:1.6.1]
at 
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
~[scala-reflect-2.10.5.jar:?]
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
~[scala-reflect-2.10.5.jar:?]
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:654)
~[spark-catalyst_2.10-1.6.1.jar:1.6.1]
at 
org.apache.spark.sql.catalyst.ScalaReflection$.localTypeOf(ScalaReflection.scala:30)
~[spark-catalyst_2.10-1.6.1.jar:1.6.1]
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:642)
~[spark-catalyst_2.10-1.6.1.jar:1.6.1]
at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
~[spark-catalyst_2.10-1.6.1.jar:1.6.1]
at org.apache.spark.sql.functions$.udf(functions.scala:2576)
~[spark-sql_2.10-1.6.1.jar:1.6.1]
at org.apache.spark.ml.feature.HashingTF.transform(HashingTF.scala:66)
~[spark-mllib_2.10-1.6.1.jar:1.6.1]
at 
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:297)
~[spark-mllib_2.10-1.6.1.jar:1.6.1]
at 
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:297)
~[spark-mllib_2.10-1.6.1.jar:1.6.1]
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
~[scala-library-2.10.5.jar:?]
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
~[scala-library-2.10.5.jar:?]
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:?]
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:297)
~[spark-mllib_2.10-1.6.1.jar:1.6.1]
at 
org.apache.spark.ml.tuning.CrossValidatorModel.transform(CrossValidator.scala:338)
~[spark-mllib_2.10-1.6.1.jar:1.6.1]


I've also seen similar errors such as:

java.lang.AssertionError: assertion failed: List(package linalg, package linalg)
at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:44)
~[scala-reflect-2.10.5.jar:?]
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrC

looking for an easy to to find the max value of a column in a data frame

2016-03-28 Thread Andy Davidson
I am using pyspark 1.6.1 and python3.


Given:

idDF2 = idDF.select(idDF.id, idDF.col.id )
idDF2.printSchema()
idDF2.show()
root
 |-- id: string (nullable = true)
 |-- col[id]: long (nullable = true)

+--+--+
|id|   col[id]|
+--+--+
|1008930924| 534494917|
|1008930924| 442237496|
|1008930924|  98069752|
|1008930924|2790311425|
|1008930924|3300869821|


I have to do a lot of work to get the max value

rows = idDF2.select("col[id]").describe().collect()
hack = [s for s in rows if s.summary == 'max']
print(hack)
print(hack[0].summary)
print(type(hack[0]))
print(hack[0].asDict()['col[id]'])
maxStr = hack[0].asDict()['col[id]']
ttt = int(maxStr)
numDimensions = 1 + ttt
print(numDimensions)

Is there an easier way?

Kind regards

Andy




Re: looking for an easy to to find the max value of a column in a data frame

2016-03-28 Thread Alexander Krasnukhin
e.g. select max value for column "foo":

from pyspark.sql.functions import max, col
df.select(max(col("foo"))).show()

On Tue, Mar 29, 2016 at 2:15 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I am using pyspark 1.6.1 and python3.
>
>
> *Given:*
>
> idDF2 = idDF.select(idDF.id, idDF.col.id )
> idDF2.printSchema()
> idDF2.show()
>
> root
>  |-- id: string (nullable = true)
>  |-- col[id]: long (nullable = true)
>
> +--+--+
> |id|   col[id]|
> +--+--+
> |1008930924| 534494917|
> |1008930924| 442237496|
> |1008930924|  98069752|
> |1008930924|2790311425|
> |1008930924|3300869821|
>
>
>
> *I have to do a lot of work to get the max value*
>
>
> rows = idDF2.select("col[id]").describe().collect()
> hack = [s for s in rows if s.summary == 'max']
> print(hack)
> print(hack[0].summary)
> print(type(hack[0]))
> print(hack[0].asDict()['col[id]'])
> maxStr = hack[0].asDict()['col[id]']
> ttt = int(maxStr)
> numDimensions = 1 + ttt
> print(numDimensions)
>
>
> Is there an easier way?
>
>
> Kind regards
>
>
> Andy
>
>


-- 
Regards,
Alexander


PySpark saving to MongoDB: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

2016-03-28 Thread Russell Jurney
I filed a JIRA  in the
mongo-hadoop project, but I'm curious if anyone else has seen this issue.
Anyone have any idea what to do? I can't save to Mongo from PySpark. A
contrived example works, but a dataframe does not.

I activate pymongo_spark and load a dataframe:

import pymongo
import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()

on_time_dataframe =
sqlContext.read.parquet('../data/on_time_performance.parquet')

Then I try saving to MongoDB in two ways:

on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')

on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
  path='file://unused',
  outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
  keyClass='org.apache.hadoop.io.Text',
  valueClass='org.apache.hadoop.io.MapWritable',
  conf={"mongo.output.uri":
"mongodb://localhost:27017/agile_data_science.on_time_performance"}
)


But I always get this error:

In [7]:
on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')

16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to process
: 1

16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
PythonRDD.scala:393

16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
PythonRDD.scala:393) with 1 output partitions

16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 2
(runJob at PythonRDD.scala:393)

16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final stage:
List()

16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List()

16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage 2
(PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing parents

16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored as
values in memory (estimated size 19.3 KB, free 249.2 KB)

16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0 stored
as bytes in memory (estimated size 9.7 KB, free 258.9 KB)

16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added broadcast_5_piece0
in memory on localhost:59881 (size: 9.7 KB, free: 511.1 MB)

16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from
broadcast at DAGScheduler.scala:1006

16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43)

16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
with 1 tasks

16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)

16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage 2.0
(TID 2)

16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split:
file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777

16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new decompressor [.gz]

16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, boot =
1249, init = 58, finish = 3

16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage 2.0
(TID 2). 4475 bytes result sent to driver

16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
2.0 (TID 2) in 1345 ms on localhost (1/1)

16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
whose tasks have all completed, from pool

16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 2 (runJob at
PythonRDD.scala:393) finished in 1.346 s

16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 2 finished: runJob at
PythonRDD.scala:393, took 1.361003 s

16/03/28 18:04:07 INFO spark.SparkContext: Starting job: take at
SerDeUtil.scala:231

16/03/28 18:04:07 INFO scheduler.DAGScheduler: Got job 3 (take at
SerDeUtil.scala:231) with 1 output partitions

16/03/28 18:04:07 INFO scheduler.DAGScheduler: Final stage: ResultStage 3
(take at SerDeUtil.scala:231)

16/03/28 18:04:07 INFO scheduler.DAGScheduler: Parents of final stage:
List()

16/03/28 18:04:07 INFO scheduler.DAGScheduler: Missing parents: List()

16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting ResultStage 3
(MapPartitionsRDD[15] at mapPartitions at SerDeUtil.scala:146), which has
no missing parents

16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6 stored as
values in memory (estimated size 19.6 KB, free 278.4 KB)

16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6_piece0 stored
as bytes in memory (estimated size 9.8 KB, free 288.2 KB)

16/03/28 18:04:07 INFO storage.BlockManagerInfo: Added broadcast_6_piece0
in memory on localhost:59881 (size: 9.8 KB, free: 511.1 MB)

16/03/28 18:04:07 INFO spark.SparkContext: Created broadcast 6 from
broadcast at DAGScheduler.scala:1006

16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from ResultStage 3 (MapPartitionsRDD[15] at mapPartitions at
SerDeUtil.scala:146)

16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Adding task set 3

Re: since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?

2016-03-28 Thread charles li
Hi, Pal, thanks a lot, this can indeed help me.

On Mon, Mar 28, 2016 at 10:44 PM, Sujit Pal  wrote:

> Hi Charles,
>
> I tried this with dummied out functions which just sum transformations of
> a list of integers, maybe they could be replaced by algorithms in your
> case. The idea is to call them through a "god" function that takes an
> additional type parameter and delegates out to the appropriate function.
> Here's my code, maybe it helps...
>
> def f0(xs):
>>   return len(xs)
>> def f1(xs):
>>   return sum(xs)
>> def f2(xs):
>>   return sum([x**2 for x in xs])
>> def f_god(n, xs):
>>   if n == 1:
>> return f1(xs)
>>   elif n == 2:
>> return f2(xs)
>>   else:
>> return f0(xs)
>>
>> xs = [x for x in range(0, 5)]
>> xs_b = sc.broadcast(xs)
>> ns = sc.parallelize([x for x in range(0, 3)])
>> results = ns.map(lambda n: f_god(n, xs_b.value))
>> print results.take(10)
>
>
> gives me:
>
> [5, 10, 30]
> -sujit
>
>
> On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau 
> wrote:
>
>> You probably want to look at the map transformation, and the many more
>> defined on RDDs. The function you pass in to map is serialized and the
>> computation is distributed.
>>
>>
>> On Monday, March 28, 2016, charles li  wrote:
>>
>>>
>>> use case: have a dataset, and want to use different algorithms on that,
>>> and fetch the result.
>>>
>>> for making this, I think I should distribute my algorithms, and run
>>> these algorithms on the dataset at the same time, am I right?
>>>
>>> but it seems that spark can not parallelize/serialize
>>> algorithms/functions, then how to make it?
>>>
>>>
>>> *here is the test code*:
>>>
>>>
>>> 
>>> def test():
>>> pass
>>> function_list = [test] * 10
>>>
>>> sc.parallelize([test] * 10).take(1)
>>>
>>> 
>>>
>>>
>>> *error message: *
>>> Py4JJavaError: An error occurred while calling
>>> z:org.apache.spark.api.python.PythonRDD.runJob.
>>>
>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in
>>> stage 9.0 (TID 105, sh-demo-hadoop-07):
>>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>>> last):
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>>> line 111, in main
>>>
>>> process()
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>>> line 106, in process
>>>
>>> serializer.dump_stream(func(split_index, iterator), outfile)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 263, in dump_stream
>>>
>>> vs = list(itertools.islice(iterator, batch))
>>>
>>>   File
>>> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line
>>> 1293, in takeUpToNumLeft
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 139, in load_stream
>>>
>>> yield self._read_with_length(stream)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 164, in _read_with_length
>>>
>>> return self.loads(obj)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 422, in loads
>>>
>>> return pickle.loads(obj)
>>>
>>> AttributeError: 'module' object has no attribute 'test'
>>>
>>>
>>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>>>
>>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
>>>
>>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>>>
>>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>>>
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> what's interesting is that* when I run sc.parallelize([test] *
>>> 10).collect() , it works fine*, returns :
>>>
>>> [,
>>>
>>>  ,
>>>
>>>  ,
>>>
>>>  ,
>>>
>>>  ,
>>>
>>>  ,
>>>
>>>  ,
>>>
>>>  ,
>>>
>>>  ,
>>>
>>>  ]
>>>
>>>
>>>
>>>
>>> --
>>> --
>>> a spark lover, a quant, a developer and a good man.
>>>
>>> http://github.com/litaotao
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>
>


-- 
*-

pyspark unable to convert dataframe column to a vector: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

2016-03-28 Thread Andy Davidson
I am using pyspark spark-1.6.1-bin-hadoop2.6 and python3. I have a data
frame with a column I need to convert to a sparse vector. I get an exception

Any idea what my bug is?

Kind regards

Andy


Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException: java.lang.RuntimeException: Unable to
instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at 
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:20
4)

Here is my python code fragment with a more complete stack trace

# load data set
from pyspark.sql import HiveContext #,SQLContext, Row

# window functions require HiveContext (spark 2.x will not require hive)
#sqlContext = SQLContext(sc)
hiveSqlContext = HiveContext(sc)

Š

import numpy as np
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import  VectorUDT

#sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# = 3 = size
# [0,1] int indices
#[1.0, 3.0] values


"""
root
 |-- id: string (nullable = true)
 |-- samples: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- id: long (nullable = false)
 |||-- rateStr: string (nullable = false)

"""

def toSparseVector(pojoList) :
indicies = []
for pojo in pojoList :
indicies.append(pojo.id)

l = np.ones(len(indicies))
v = Vectors.spark(numDimensions, indicies,  l)
return v

myUDF = udf(toSparseVector, VectorUDT()))
features = df.withColumn(newColName, myUDF(df[³samples"]))


Py4JJavaError Traceback (most recent call last)
 in ()
 30 #myUDF = udf(lambda pojoList: labelStr if (labelStr == "noise") else
"injury", StringType())
 31 
---> 32 myUDF = udf(toSparseVector, VectorUDT()) #
 33 features = df.withColumn(newColName, myUDF(df["follows"]))

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/functi
ons.py in udf(f, returnType)
   1595 [Row(slen=5), Row(slen=3)]
   1596 """
-> 1597 return UserDefinedFunction(f, returnType)
   1598 
   1599 blacklist = ['map', 'since', 'ignore_unicode_prefix']

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/functi
ons.py in __init__(self, func, returnType, name)
   1556 self.returnType = returnType
   1557 self._broadcast = None
-> 1558 self._judf = self._create_judf(name)
   1559 
   1560 def _create_judf(self, name):

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/functi
ons.py in _create_judf(self, name)
   1567 pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command, self)
   1568 ctx = SQLContext.getOrCreate(sc)
-> 1569 jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
   1570 if name is None:
   1571 name = f.__name__ if hasattr(f, '__name__') else
f.__class__.__name__

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/contex
t.py in _ssql_ctx(self)
681 try:
682 if not hasattr(self, '_scala_HiveContext'):
--> 683 self._scala_HiveContext = self._get_hive_ctx()
684 return self._scala_HiveContext
685 except Py4JError as e:

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/contex
t.py in _get_hive_ctx(self)
690 
691 def _get_hive_ctx(self):
--> 692 return self._jvm.HiveContext(self._jsc.sc())
693 
694 def refreshTable(self, tableName):

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.z
ip/py4j/java_gateway.py in __call__(self, *args)
   1062 answer = self._gateway_client.send_command(command)
   1063 return_value = get_return_value(
-> 1064 answer, self._gateway_client, None, self._fqn)
   1065 
   1066 for temp_arg in temp_args:

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.
py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/p
y4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client,
target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException: java.lang.RuntimeException: Unable to
instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.Sess

Re: PySpark saving to MongoDB: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

2016-03-28 Thread Russell Jurney
Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD. This
seems related to DataFrames. Is there a way to convert a DataFrame's RDD to
a 'normal' RDD?


On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney 
wrote:

> I filed a JIRA  in the
> mongo-hadoop project, but I'm curious if anyone else has seen this issue.
> Anyone have any idea what to do? I can't save to Mongo from PySpark. A
> contrived example works, but a dataframe does not.
>
> I activate pymongo_spark and load a dataframe:
>
> import pymongo
> import pymongo_spark
> # Important: activate pymongo_spark.
> pymongo_spark.activate()
>
> on_time_dataframe =
> sqlContext.read.parquet('../data/on_time_performance.parquet')
>
> Then I try saving to MongoDB in two ways:
>
>
> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>
> on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
>   path='file://unused',
>   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
>   keyClass='org.apache.hadoop.io.Text',
>   valueClass='org.apache.hadoop.io.MapWritable',
>   conf={"mongo.output.uri":
> "mongodb://localhost:27017/agile_data_science.on_time_performance"}
> )
>
>
> But I always get this error:
>
> In [7]:
> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>
> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
> process : 1
>
> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
> PythonRDD.scala:393
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
> PythonRDD.scala:393) with 1 output partitions
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 2
> (runJob at PythonRDD.scala:393)
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List()
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage 2
> (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing parents
>
> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored as
> values in memory (estimated size 19.3 KB, free 249.2 KB)
>
> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0
> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB)
>
> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added broadcast_5_piece0
> in memory on localhost:59881 (size: 9.7 KB, free: 511.1 MB)
>
> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from
> broadcast at DAGScheduler.scala:1006
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
> from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43)
>
> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
> with 1 tasks
>
> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in
> stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
>
> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage 2.0
> (TID 2)
>
> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split:
> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777
>
> 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new decompressor [.gz]
>
> 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, boot =
> 1249, init = 58, finish = 3
>
> 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage 2.0
> (TID 2). 4475 bytes result sent to driver
>
> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 in
> stage 2.0 (TID 2) in 1345 ms on localhost (1/1)
>
> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
> whose tasks have all completed, from pool
>
> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 2 (runJob at
> PythonRDD.scala:393) finished in 1.346 s
>
> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 2 finished: runJob at
> PythonRDD.scala:393, took 1.361003 s
>
> 16/03/28 18:04:07 INFO spark.SparkContext: Starting job: take at
> SerDeUtil.scala:231
>
> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Got job 3 (take at
> SerDeUtil.scala:231) with 1 output partitions
>
> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Final stage: ResultStage 3
> (take at SerDeUtil.scala:231)
>
> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
>
> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Missing parents: List()
>
> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting ResultStage 3
> (MapPartitionsRDD[15] at mapPartitions at SerDeUtil.scala:146), which has
> no missing parents
>
> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6 stored as
> values in memory (estimated size 19.6 KB, free 278.4 KB)
>
> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6_piece0
> stored as bytes in memory (estimated size 9.8 KB, free 288.2 KB)
>
> 16/03/28 18:04:07 INFO stora

Unsubscribe

2016-03-28 Thread Andrew Heinrichs
On Mar 29, 2016 8:56 AM, "Alexander Krasnukhin" 
wrote:

> e.g. select max value for column "foo":
>
> from pyspark.sql.functions import max, col
> df.select(max(col("foo"))).show()
>
> On Tue, Mar 29, 2016 at 2:15 AM, Andy Davidson <
> a...@santacruzintegration.com> wrote:
>
>> I am using pyspark 1.6.1 and python3.
>>
>>
>> *Given:*
>>
>> idDF2 = idDF.select(idDF.id, idDF.col.id )
>> idDF2.printSchema()
>> idDF2.show()
>>
>> root
>>  |-- id: string (nullable = true)
>>  |-- col[id]: long (nullable = true)
>>
>> +--+--+
>> |id|   col[id]|
>> +--+--+
>> |1008930924| 534494917|
>> |1008930924| 442237496|
>> |1008930924|  98069752|
>> |1008930924|2790311425|
>> |1008930924|3300869821|
>>
>>
>>
>> *I have to do a lot of work to get the max value*
>>
>>
>> rows = idDF2.select("col[id]").describe().collect()
>> hack = [s for s in rows if s.summary == 'max']
>> print(hack)
>> print(hack[0].summary)
>> print(type(hack[0]))
>> print(hack[0].asDict()['col[id]'])
>> maxStr = hack[0].asDict()['col[id]']
>> ttt = int(maxStr)
>> numDimensions = 1 + ttt
>> print(numDimensions)
>>
>>
>> Is there an easier way?
>>
>>
>> Kind regards
>>
>>
>> Andy
>>
>>
>
>
> --
> Regards,
> Alexander
>


Master options Cluster/Client descrepencies.

2016-03-28 Thread satyajit vegesna
Hi All,

I have written a spark program on my dev box ,
   IDE:Intellij
   scala version:2.11.7
   spark verison:1.6.1

run fine from IDE, by providing proper input and output paths including
 master.

But when i try to deploy the code in my cluster made of below,

   Spark version:1.6.1
built from source pkg using scala 2.11
But when i try spark-shell on cluster i get scala version to be
2.10.5
 hadoop yarn cluster 2.6.0

and with additional options,

--executor-memory
--total-executor-cores
--deploy-mode cluster/client
--master yarn

i get Exception in thread "main" java.lang.NoSuchMethodError:
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at com.movoto.SparkPost$.main(SparkPost.scala:36)
at com.movoto.SparkPost.main(SparkPost.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

i understand this to be a scala version issue, as i have faced this before.

Is there something that i have change and try  things to get the same
program running on cluster.

Regards,
Satyajit.


Re: [SQL] Two columns in output vs one when joining DataFrames?

2016-03-28 Thread Divya Gehlot
Hi Jacek ,

The difference is being mentioned in Spark doc itself

Note that if you perform a self-join using this function without aliasing
the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the
join, since
* there is no way to disambiguate which side of the join you would like to
reference.
*

On 26 March 2016 at 04:19, Jacek Laskowski  wrote:

> Hi,
>
> I've read the note about both columns included when DataFrames are
> joined, but don't think it differentiated between versions of join. Is
> this a feature or a bug that the following session shows one _1 column
> with Seq("_1") and two columns for ===?
>
> {code}
> scala> left.join(right, Seq("_1")).show
> +---+---+---+
> | _1| _2| _2|
> +---+---+---+
> |  1|  a|  a|
> |  2|  b|  b|
> +---+---+---+
>
>
> scala> left.join(right, left("_1") === right("_1")).show
> +---+---+---+---+
> | _1| _2| _1| _2|
> +---+---+---+---+
> |  1|  a|  1|  a|
> |  2|  b|  2|  b|
> +---+---+---+---+
> {code}
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: PySpark saving to MongoDB: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

2016-03-28 Thread Ted Yu
See this method:

  lazy val rdd: RDD[T] = {

On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney 
wrote:

> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD. This
> seems related to DataFrames. Is there a way to convert a DataFrame's RDD to
> a 'normal' RDD?
>
>
> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney 
> wrote:
>
>> I filed a JIRA  in the
>> mongo-hadoop project, but I'm curious if anyone else has seen this issue.
>> Anyone have any idea what to do? I can't save to Mongo from PySpark. A
>> contrived example works, but a dataframe does not.
>>
>> I activate pymongo_spark and load a dataframe:
>>
>> import pymongo
>> import pymongo_spark
>> # Important: activate pymongo_spark.
>> pymongo_spark.activate()
>>
>> on_time_dataframe =
>> sqlContext.read.parquet('../data/on_time_performance.parquet')
>>
>> Then I try saving to MongoDB in two ways:
>>
>>
>> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>
>> on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
>>   path='file://unused',
>>   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
>>   keyClass='org.apache.hadoop.io.Text',
>>   valueClass='org.apache.hadoop.io.MapWritable',
>>   conf={"mongo.output.uri":
>> "mongodb://localhost:27017/agile_data_science.on_time_performance"}
>> )
>>
>>
>> But I always get this error:
>>
>> In [7]:
>> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>
>> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
>> process : 1
>>
>> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
>> PythonRDD.scala:393
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
>> PythonRDD.scala:393) with 1 output partitions
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 2
>> (runJob at PythonRDD.scala:393)
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final stage:
>> List()
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List()
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage 2
>> (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing parents
>>
>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored as
>> values in memory (estimated size 19.3 KB, free 249.2 KB)
>>
>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0
>> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB)
>>
>> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added broadcast_5_piece0
>> in memory on localhost:59881 (size: 9.7 KB, free: 511.1 MB)
>>
>> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from
>> broadcast at DAGScheduler.scala:1006
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
>> from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43)
>>
>> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
>> with 1 tasks
>>
>> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in
>> stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
>>
>> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage 2.0
>> (TID 2)
>>
>> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split:
>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777
>>
>> 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new decompressor
>> [.gz]
>>
>> 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, boot =
>> 1249, init = 58, finish = 3
>>
>> 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage 2.0
>> (TID 2). 4475 bytes result sent to driver
>>
>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 in
>> stage 2.0 (TID 2) in 1345 ms on localhost (1/1)
>>
>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
>> whose tasks have all completed, from pool
>>
>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 2 (runJob at
>> PythonRDD.scala:393) finished in 1.346 s
>>
>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 2 finished: runJob at
>> PythonRDD.scala:393, took 1.361003 s
>>
>> 16/03/28 18:04:07 INFO spark.SparkContext: Starting job: take at
>> SerDeUtil.scala:231
>>
>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Got job 3 (take at
>> SerDeUtil.scala:231) with 1 output partitions
>>
>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Final stage: ResultStage 3
>> (take at SerDeUtil.scala:231)
>>
>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Parents of final stage:
>> List()
>>
>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Missing parents: List()
>>
>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting ResultStage 3
>> (MapPartitionsRDD[15] at mapPartitions at SerDeUtil.scala:146), which has
>> no missing parents
>>
>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6 s

Re: PySpark saving to MongoDB: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

2016-03-28 Thread Russell Jurney
Ted, I am using the .rdd method, see above, but for some reason these RDDs
can't be saved to MongoDB or ElasticSearch.

I think this is a bug in PySpark/DataFrame. I can't think of another
explanation... somehow DataFrame.rdd RDDs are not able to be stored to an
arbitrary Hadoop OutputFormat. When I do this:

on_time_lines =
sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz")
on_time_performance = on_time_lines.map(lambda x: json.loads(x))

on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')


It works. Same data, but loaded as textFile instead of DataFrame (via
json/parquet dataframe loading).

It is the DataFrame.rdd bit that is broken. I will file a JIRA.

Does anyone know a workaround?

On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu  wrote:

> See this method:
>
>   lazy val rdd: RDD[T] = {
>
> On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney 
> wrote:
>
>> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD.
>> This seems related to DataFrames. Is there a way to convert a DataFrame's
>> RDD to a 'normal' RDD?
>>
>>
>> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney > > wrote:
>>
>>> I filed a JIRA  in the
>>> mongo-hadoop project, but I'm curious if anyone else has seen this issue.
>>> Anyone have any idea what to do? I can't save to Mongo from PySpark. A
>>> contrived example works, but a dataframe does not.
>>>
>>> I activate pymongo_spark and load a dataframe:
>>>
>>> import pymongo
>>> import pymongo_spark
>>> # Important: activate pymongo_spark.
>>> pymongo_spark.activate()
>>>
>>> on_time_dataframe =
>>> sqlContext.read.parquet('../data/on_time_performance.parquet')
>>>
>>> Then I try saving to MongoDB in two ways:
>>>
>>>
>>> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>
>>> on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
>>>   path='file://unused',
>>>   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
>>>   keyClass='org.apache.hadoop.io.Text',
>>>   valueClass='org.apache.hadoop.io.MapWritable',
>>>   conf={"mongo.output.uri":
>>> "mongodb://localhost:27017/agile_data_science.on_time_performance"}
>>> )
>>>
>>>
>>> But I always get this error:
>>>
>>> In [7]:
>>> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>
>>> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
>>> process : 1
>>>
>>> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
>>> PythonRDD.scala:393
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
>>> PythonRDD.scala:393) with 1 output partitions
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage: ResultStage
>>> 2 (runJob at PythonRDD.scala:393)
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final stage:
>>> List()
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List()
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage 2
>>> (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing parents
>>>
>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored as
>>> values in memory (estimated size 19.3 KB, free 249.2 KB)
>>>
>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0
>>> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB)
>>>
>>> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added
>>> broadcast_5_piece0 in memory on localhost:59881 (size: 9.7 KB, free: 511.1
>>> MB)
>>>
>>> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from
>>> broadcast at DAGScheduler.scala:1006
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing
>>> tasks from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43)
>>>
>>> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
>>> with 1 tasks
>>>
>>> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in
>>> stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
>>>
>>> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage 2.0
>>> (TID 2)
>>>
>>> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split:
>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777
>>>
>>> 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new decompressor
>>> [.gz]
>>>
>>> 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, boot =
>>> 1249, init = 58, finish = 3
>>>
>>> 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage 2.0
>>> (TID 2). 4475 bytes result sent to driver
>>>
>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 in
>>> stage 2.0 (TID 2) in 1345 ms on localhost (1/1)
>>>
>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
>>> whose tasks have all completed, from pool
>>>
>>> 16/03/28 18:04:07 INFO scheduler.DAGSche

Re: PySpark saving to MongoDB: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

2016-03-28 Thread Russell Jurney
I created a JIRA: https://issues.apache.org/jira/browse/SPARK-14229

On Mon, Mar 28, 2016 at 7:43 PM, Russell Jurney 
wrote:

> Ted, I am using the .rdd method, see above, but for some reason these RDDs
> can't be saved to MongoDB or ElasticSearch.
>
> I think this is a bug in PySpark/DataFrame. I can't think of another
> explanation... somehow DataFrame.rdd RDDs are not able to be stored to an
> arbitrary Hadoop OutputFormat. When I do this:
>
> on_time_lines =
> sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz")
> on_time_performance = on_time_lines.map(lambda x: json.loads(x))
>
>
> on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>
>
> It works. Same data, but loaded as textFile instead of DataFrame (via
> json/parquet dataframe loading).
>
> It is the DataFrame.rdd bit that is broken. I will file a JIRA.
>
> Does anyone know a workaround?
>
> On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu  wrote:
>
>> See this method:
>>
>>   lazy val rdd: RDD[T] = {
>>
>> On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney > > wrote:
>>
>>> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD.
>>> This seems related to DataFrames. Is there a way to convert a DataFrame's
>>> RDD to a 'normal' RDD?
>>>
>>>
>>> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney <
>>> russell.jur...@gmail.com> wrote:
>>>
 I filed a JIRA  in the
 mongo-hadoop project, but I'm curious if anyone else has seen this issue.
 Anyone have any idea what to do? I can't save to Mongo from PySpark. A
 contrived example works, but a dataframe does not.

 I activate pymongo_spark and load a dataframe:

 import pymongo
 import pymongo_spark
 # Important: activate pymongo_spark.
 pymongo_spark.activate()

 on_time_dataframe =
 sqlContext.read.parquet('../data/on_time_performance.parquet')

 Then I try saving to MongoDB in two ways:


 on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')

 on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
   path='file://unused',
   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
   keyClass='org.apache.hadoop.io.Text',
   valueClass='org.apache.hadoop.io.MapWritable',
   conf={"mongo.output.uri":
 "mongodb://localhost:27017/agile_data_science.on_time_performance"}
 )


 But I always get this error:

 In [7]:
 on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')

 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
 process : 1

 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
 PythonRDD.scala:393

 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
 PythonRDD.scala:393) with 1 output partitions

 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage: ResultStage
 2 (runJob at PythonRDD.scala:393)

 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final stage:
 List()

 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List()

 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage 2
 (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing parents

 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored as
 values in memory (estimated size 19.3 KB, free 249.2 KB)

 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0
 stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB)

 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added
 broadcast_5_piece0 in memory on localhost:59881 (size: 9.7 KB, free: 511.1
 MB)

 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from
 broadcast at DAGScheduler.scala:1006

 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing
 tasks from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43)

 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
 with 1 tasks

 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in
 stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)

 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage 2.0
 (TID 2)

 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split:
 file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777

 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new decompressor
 [.gz]

 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, boot =
 1249, init = 58, finish = 3

 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage
 2.0 (TID 2). 4475 bytes result sent to driver

>>

Re: PySpark saving to MongoDB: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

2016-03-28 Thread Russell Jurney
btw, they can't be saved to BSON either. This seems a generic issue, can
anyone else reproduce this?

On Mon, Mar 28, 2016 at 8:02 PM, Russell Jurney 
wrote:

> I created a JIRA: https://issues.apache.org/jira/browse/SPARK-14229
>
> On Mon, Mar 28, 2016 at 7:43 PM, Russell Jurney 
> wrote:
>
>> Ted, I am using the .rdd method, see above, but for some reason these
>> RDDs can't be saved to MongoDB or ElasticSearch.
>>
>> I think this is a bug in PySpark/DataFrame. I can't think of another
>> explanation... somehow DataFrame.rdd RDDs are not able to be stored to an
>> arbitrary Hadoop OutputFormat. When I do this:
>>
>> on_time_lines =
>> sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz")
>> on_time_performance = on_time_lines.map(lambda x: json.loads(x))
>>
>>
>> on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>
>>
>> It works. Same data, but loaded as textFile instead of DataFrame (via
>> json/parquet dataframe loading).
>>
>> It is the DataFrame.rdd bit that is broken. I will file a JIRA.
>>
>> Does anyone know a workaround?
>>
>> On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu  wrote:
>>
>>> See this method:
>>>
>>>   lazy val rdd: RDD[T] = {
>>>
>>> On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney <
>>> russell.jur...@gmail.com> wrote:
>>>
 Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD.
 This seems related to DataFrames. Is there a way to convert a DataFrame's
 RDD to a 'normal' RDD?


 On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney <
 russell.jur...@gmail.com> wrote:

> I filed a JIRA  in the
> mongo-hadoop project, but I'm curious if anyone else has seen this issue.
> Anyone have any idea what to do? I can't save to Mongo from PySpark. A
> contrived example works, but a dataframe does not.
>
> I activate pymongo_spark and load a dataframe:
>
> import pymongo
> import pymongo_spark
> # Important: activate pymongo_spark.
> pymongo_spark.activate()
>
> on_time_dataframe =
> sqlContext.read.parquet('../data/on_time_performance.parquet')
>
> Then I try saving to MongoDB in two ways:
>
>
> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>
> on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
>   path='file://unused',
>   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
>   keyClass='org.apache.hadoop.io.Text',
>   valueClass='org.apache.hadoop.io.MapWritable',
>   conf={"mongo.output.uri":
> "mongodb://localhost:27017/agile_data_science.on_time_performance"}
> )
>
>
> But I always get this error:
>
> In [7]:
> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>
> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
> process : 1
>
> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
> PythonRDD.scala:393
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
> PythonRDD.scala:393) with 1 output partitions
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage:
> ResultStage 2 (runJob at PythonRDD.scala:393)
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List()
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage
> 2 (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing 
> parents
>
> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored
> as values in memory (estimated size 19.3 KB, free 249.2 KB)
>
> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0
> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB)
>
> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added
> broadcast_5_piece0 in memory on localhost:59881 (size: 9.7 KB, free: 511.1
> MB)
>
> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from
> broadcast at DAGScheduler.scala:1006
>
> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing
> tasks from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43)
>
> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set
> 2.0 with 1 tasks
>
> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in
> stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
>
> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage
> 2.0 (TID 2)
>
> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split:
> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777
>
> 16/03/28 

run spark job

2016-03-28 Thread Fei Hu
Hi,

I am wondering how to run the spark job by java command, such as: java -cp 
spark.jar mainclass. When running/debugging the spark program in IntelliJ IDEA, 
it uses java command to run spark main class, so I think it should be able to 
run the spark job by java command besides the spark-submit command.

Thanks in advance,
Fei



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does SparkSql thrift server support insert/update/delete sql statement

2016-03-28 Thread sage
Does SparkSql thrift server support insert/update/delete sql statement when
connecting using jdbc?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-SparkSql-thrift-server-support-insert-update-delete-sql-statement-tp26618.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



overriding spark.streaming.blockQueueSize default value

2016-03-28 Thread Spark Newbie
Hi All,

The default value for spark.streaming.blockQueueSize is 10 in
https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala.
In spark kinesis asl 1.4 the received Kinesis records are stored by calling
addData on line 115 -
https://github.com/apache/spark/blob/branch-1.4/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala#L115
which pushes one data item to the buffer. This is a problem because, at
application startup, a single Kinesis Worker gains lease for all (or a
majority of) shards for the Kinesis stream. This is by design, KCL load
balances as new Workers are started. But, the single Worker which initially
gains lease for a lot of shards, ends up being blocked on the addData
method, as there will be many KinesisRecordProcessor threads trying to add
the received data to the buffer. The buffer uses a ArrayBlockingQueue with
the size specified in spark.streaming.blockQueueSize which is set to 10 by
default. The
ArrayBlockingQueue is flushed out to memorystore every 100ms. So the
KinesisRecordProcessor threads will be blocked for long period (like upto
an hour) on application startup. The impact is that there will be some
Kinesis shards that don't get consumed by the spark streaming application,
until its KinesisRecordProcessor thread gets unblocked.

To fix/work around the issue would it be ok to increase the
spark.streaming.blockQueueSize to a larger value. I suppose the main
consideration when increasing this size would be the memory allocated to
the executor. I haven't seen much documentation on this config. And any
advise on how to fine tune this would be useful.

Thanks,
Spark newbie


Re: Strange ML pipeline errors from HashingTF using v1.6.1

2016-03-28 Thread Jacek Laskowski
Hi,

How do you run the pipeline? Do you assembly or package? Is this on
local or spark or other cluster manager? What's the build
configuration?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Mar 28, 2016 at 7:11 PM, Timothy Potter  wrote:
> I'm seeing the following error when trying to generate a prediction
> from a very simple ML pipeline based model. I've verified that the raw
> data sent to the tokenizer is valid (not null). It seems like this is
> some sort of weird classpath or class loading type issue. Any help you
> can provide in trying to troubleshoot this further would be
> appreciated.
>
>  Error in machine-learning, docId=20news-18828/alt.atheism/51176
> scala.reflect.internal.Symbols$CyclicReference: illegal cyclic
> reference involving package 
> at scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2768)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$Roots$RootPackage$.(Mirrors.scala:268)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$Roots.RootPackage$lzycompute(Mirrors.scala:267)
> ~[scala-reflect-2.10.5.jar:?]
> at scala.reflect.internal.Mirrors$Roots.RootPackage(Mirrors.scala:267)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$$makeScalaPackage(JavaMirrors.scala:902)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.runtime.JavaMirrors$class.missingHook(JavaMirrors.scala:1299)
> ~[scala-reflect-2.10.5.jar:?]
> at scala.reflect.runtime.JavaUniverse.missingHook(JavaUniverse.scala:12)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.universeMissingHook(Mirrors.scala:77)
> ~[scala-reflect-2.10.5.jar:?]
> at scala.reflect.internal.Mirrors$RootsBase.missingHook(Mirrors.scala:79)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
> ~[scala-reflect-2.10.5.jar:?]
> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
> ~[scala-reflect-2.10.5.jar:?]
> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> org.apache.spark.ml.feature.HashingTF$$typecreator1$1.apply(HashingTF.scala:66)
> ~[spark-mllib_2.10-1.6.1.jar:1.6.1]
> at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
> ~[scala-reflect-2.10.5.jar:?]
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
> ~[scala-reflect-2.10.5.jar:?]
> at 
> org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:654)
> ~[spark-catalyst_2.10-1.6.1.jar:1.6.1]
> at 
> org.apache.spark.sql.catalyst.ScalaReflection$.localTypeOf(ScalaReflection.scala:30)
> ~[spark-catalyst_2.10-1.6.1.jar:1.6.1]
> at 
> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:642)
> ~[spark-catalyst_2.10-1.6.1.jar:1.6.1]
> at 
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
> ~[spark-catalyst_2.10-1.6.1.jar:1.6.1]
> at org.apache.spark.sql.functions$.udf(functions.scala:2576)
> ~[spark-sql_2.10-1.6.1.jar:1.6.1]
> at org.apache.spark.ml.feature.HashingTF.transform(HashingTF.scala:66)
> ~[spark-mllib_2.10-1.6.1.jar:1.6.1]
> at 
> org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:297)
> ~[spark-mllib_2.10-1.6.1.jar:1.6.1]
> at 
> org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:297)
> ~[spark-mllib_2.10-1.6.1.jar:1.6.1]
> at 
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> ~[scala-library-2.10.5.jar:?]
> at 
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
> ~[scala-library-2.10.5.jar:?]
> at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
> ~[scala-library-2.10.5.jar:?]
> at org.apache.spark.ml.PipelineModel.transform(Pipe

Re: Does SparkSql thrift server support insert/update/delete sql statement

2016-03-28 Thread Raymond Honderdors
It should
Depensing on the storage used

I am facing a simular issue running spark on emr

I got emr login errors for insert

Sent from Outlook Mobile



On Mon, Mar 28, 2016 at 10:31 PM -0700, "sage" 
mailto:lkke...@gmail.com>> wrote:

Does SparkSql thrift server support insert/update/delete sql statement when
connecting using jdbc?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-SparkSql-thrift-server-support-insert-update-delete-sql-statement-tp26618.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

[Read More]

[http://www.sizmek.com/Sizmek.png]


Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Jerry Lam
Hi Sunitha,

Thank you for the reference Jira. It looks like this is the bug I'm
hitting. Most of the bugs related to this seems to associate with
dataframes derived from the one dataframe (base in this case). In SQL, this
is a self-join and dropping d2.label should not affect d1.label. There are
other bugs I found these three days that are associated with this type of
joins. In one case, if I don't drop the duplicate column BEFORE the join,
spark has preferences on the columns from d2 dataframe. I will see if I can
replicate in a small program like above.

Best Regards,

Jerry


On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati 
wrote:

> Hi Jerry,
>
> I think you are running into an issue similar to SPARK-14040
> https://issues.apache.org/jira/browse/SPARK-14040
>
> One way to resolve it is to use alias.
>
> Here is an example that I tried on trunk and I do not see any exceptions.
>
> val d1=base.where($"label" === 0) as("d1")
> val d2=base.where($"label" === 1).as("d2")
>
> d1.join(d2, $"d1.id" === $"d2.id", 
> "left_outer").drop($"d2.label").select($"d1.label")
>
>
> Hope this helps some.
>
> Best regards,
> Sunitha.
>
> On Mar 28, 2016, at 2:34 PM, Jerry Lam  wrote:
>
> Hi spark users and developers,
>
> I'm using spark 1.5.1 (I have no choice because this is what we used). I
> ran into some very unexpected behaviour when I did some join operations
> lately. I cannot post my actual code here and the following code is not for
> practical reasons but it should demonstrate the issue.
>
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"),
> "left_outer").drop(d2("label")).select(d1("label"))
>
>
> The above code will throw an exception saying the column label is not
> found. Do you have a reason for throwing an exception when the column has
> not been dropped for d1("label")?
>
> Best Regards,
>
> Jerry
>
>
>


Re: [Spark SQL] Unexpected Behaviour

2016-03-28 Thread Jerry Lam
Hi guys,

I have another example to illustrate the issue. I think the problem is
pretty nasty.

val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50 to
99)).toDF("id", "label")
val d1 = base.where($"label" < 60)
val d2 = base.where($"label" === 60)
d1.join(d2, "id").show
+---+-+-+
| id|label|label|
+---+-+-+
| 40|   40|   60|
+---+-+-+

d1.join(d2, "id").select(d1("label")).show
+-+
|label|
+-+
|   40|
+-+
(expected answer: 40, right!)

d1.join(d2, "id").select(d2("label")).show
+-+
|label|
+-+
|   40|
+-+
(expected answer: 60, wrong!)

d1.join(d2, "id").select(d2("label")).explain
== Physical Plan ==
TungstenProject [label#15]
 SortMergeJoin [id#14], [id#30]
  TungstenSort [id#14 ASC], false, 0
   TungstenExchange hashpartitioning(id#14)
TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
 Filter (_2#13 < 60)
  Scan PhysicalRDD[_1#12,_2#13]
  TungstenSort [id#30 ASC], false, 0
   TungstenExchange hashpartitioning(id#30)
TungstenProject [_1#12 AS id#30]
 Filter (_2#13 = 60)
  Scan PhysicalRDD[_1#12,_2#13]

Again, this is just a tip of the iceberg. I have spent hours to find out
this weird behaviour.

Best Regards,

Jerry


Best Regards,

Jerry

On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam  wrote:

> Hi Sunitha,
>
> Thank you for the reference Jira. It looks like this is the bug I'm
> hitting. Most of the bugs related to this seems to associate with
> dataframes derived from the one dataframe (base in this case). In SQL, this
> is a self-join and dropping d2.label should not affect d1.label. There are
> other bugs I found these three days that are associated with this type of
> joins. In one case, if I don't drop the duplicate column BEFORE the join,
> spark has preferences on the columns from d2 dataframe. I will see if I can
> replicate in a small program like above.
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati  > wrote:
>
>> Hi Jerry,
>>
>> I think you are running into an issue similar to SPARK-14040
>> https://issues.apache.org/jira/browse/SPARK-14040
>>
>> One way to resolve it is to use alias.
>>
>> Here is an example that I tried on trunk and I do not see any exceptions.
>>
>>
>> val d1=base.where($"label" === 0) as("d1")
>> val d2=base.where($"label" === 1).as("d2")
>>
>> d1.join(d2, $"d1.id" === $"d2.id", 
>> "left_outer").drop($"d2.label").select($"d1.label")
>>
>>
>> Hope this helps some.
>>
>> Best regards,
>> Sunitha.
>>
>> On Mar 28, 2016, at 2:34 PM, Jerry Lam  wrote:
>>
>> Hi spark users and developers,
>>
>> I'm using spark 1.5.1 (I have no choice because this is what we used). I
>> ran into some very unexpected behaviour when I did some join operations
>> lately. I cannot post my actual code here and the following code is not for
>> practical reasons but it should demonstrate the issue.
>>
>> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
>> 99).map((_,1))).toDF("id", "label")
>> val d1=base.where($"label" === 0)
>> val d2=base.where($"label" === 1)
>> d1.join(d2, d1("id") === d2("id"),
>> "left_outer").drop(d2("label")).select(d1("label"))
>>
>>
>> The above code will throw an exception saying the column label is not
>> found. Do you have a reason for throwing an exception when the column has
>> not been dropped for d1("label")?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>


Change TimeZone Setting in Spark 1.5.2

2016-03-28 Thread Divya Gehlot
Hi,

The Spark set up is  on Hadoop cluster.
How can I set up the Spark timezone to sync with Server Timezone ?
Any idea?


Thanks,
Divya