Re: New ColumnType For Decimal Caching

2015-02-15 Thread Cheng Lian
Hi Manoj,

Yes, you've already hit the point. I think timestamp type support in the
in-memory columnar support can be a good reference for you. Also, you may
want to enable compression support for decimal type by adding DECIMAL
column type to RunLengthEncoding.supports and DictionaryEncoding.supports.
Thanks for working on this!

Best,
Cheng

On Sat, Feb 14, 2015 at 5:32 PM, Michael Armbrust 
wrote:

> That sound right to me.  Cheng could elaborate if you are missing
> something.
>
> On Fri, Feb 13, 2015 at 11:36 AM, Manoj Samel 
> wrote:
>
>> Thanks Michael for the pointer & Sorry for the delayed reply.
>>
>> Taking a quick inventory of scope of change - Is the column type for
>> Decimal caching needed only in the caching layer (4 files
>> in org.apache.spark.sql.columnar - ColumnAccessor.scala,
>> ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala)
>>
>> Or do other SQL components also need to be touched ?
>>
>> Hoping for a quick feedback of top of your head ...
>>
>> Thanks,
>>
>>
>>
>> On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust 
>> wrote:
>>
>>> You could add a new ColumnType
>>> 
>>> .
>>>
>>> PRs welcome :)
>>>
>>> On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel 
>>> wrote:
>>>
 Hi Michael,

 As a test, I have same data loaded as another parquet - except with the
 2 decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
 the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
 time of uncached query.

 Would it be possible for Spark to store in-memory decimal in some form
 of long with decoration ?

 For the immediate future, is there any hook that we can use to provide
 custom caching / processing for the decimal type in RDD so other semantic
 does not changes ?

 Thanks,




 On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel 
 wrote:

> Could you share which data types are optimized in the in-memory
> storage and how are they optimized ?
>
> On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> You'll probably only get good compression for strings when dictionary
>> encoding works.  We don't optimize decimals in the in-memory columnar
>> storage, so you are paying expensive serialization there likely.
>>
>> On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel > > wrote:
>>
>>> Flat data of types String, Int and couple of decimal(14,4)
>>>
>>> On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel <
 manojsamelt...@gmail.com> wrote:

> Hi Michael,
>
> The storage tab shows the RDD resides fully in memory (10
> partitions) with zero disk usage. Tasks for subsequent select on this 
> table
> in cache shows minimal overheads (GC, queueing, shuffle write etc. 
> etc.),
> so overhead is not issue. However, it is still twice as slow as 
> reading
> uncached table.
>
> I have spark.rdd.compress = true, 
> spark.sql.inMemoryColumnarStorage.compressed
> = true, spark.serializer =
> org.apache.spark.serializer.KryoSerializer
>
> Something that may be of relevance ...
>
> The underlying table is Parquet, 10 partitions totaling ~350 MB.
> For mapPartition phase of query on uncached table shows input size of 
> 351
> MB. However, after the table is cached, the storage shows the cache 
> size as
> 12GB. So the in-memory representation seems much bigger than on-disk, 
> even
> with the compression options turned on. Any thoughts on this ?
>
> mapPartition phase same query for cache table shows input size of
> 12GB (full size of cache table) and takes twice the time as 
> mapPartition
> for uncached query.
>
> Thanks,
>
>
>
>
>
>
> On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> Check the storage tab.  Does the table actually fit in memory?
>> Otherwise you are rebuilding column buffers in addition to reading 
>> the data
>> off of the disk.
>>
>> On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel <
>> manojsamelt...@gmail.com> wrote:
>>
>>> Spark 1.2
>>>
>>> Data stored in parquet table (large number of rows)
>>>
>>> Test 1
>>>
>>> select a, sum(b), sum(c) from table
>>>
>>> Test
>>>
>>> sqlContext.cacheTable()

Re: [GraphX] Excessive value recalculations during aggregateMessages cycles

2015-02-15 Thread Takeshi Yamamuro
Hi,

I tried quick and simple tests though, ISTM the vertices below were
correctly cached.
Could you give me the differences between my codes and yours?

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

object Prog {
  def processInt(d: Int) = d * 2
}

val g = GraphLoader.edgeListFile(sc, "../temp/graph.txt")
.cache

val g2 = g.outerJoinVertices(g.degrees)(
  (vid, old, msg) => Prog.processInt(msg.getOrElse(0)))
.cache

g2.vertices.count

val g3 = g.outerJoinVertices(g.degrees)(
  (vid, old, msg) => msg.getOrElse(0))
.mapVertices((vid, d) => Prog.processInt(d))
.cache

g3.vertices.count

'g2.vertices.toDebugString' outputs;

(2) VertexRDDImpl[16] at RDD at VertexRDD.scala:57 []
 |  VertexRDD ZippedPartitionsRDD2[15] at zipPartitions at
VertexRDDImpl.scala:121 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
 |  ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:330 []
|  GraphLoader.edgeListFile - edges (../temp/graph.txt), EdgeRDD,
EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at Graph...


'g3.vertices.toDebugString' outputs;

(2) VertexRDDImpl[33] at RDD at VertexRDD.scala:57 []
 |  VertexRDD MapPartitionsRDD[32] at mapPartitions at
VertexRDDImpl.scala:96 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD ZippedPartitionsRDD2[24] at zipPartitions at
VertexRDDImpl.scala:121 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
 |  ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPar...

-- maropu

On Mon, Feb 9, 2015 at 5:47 AM, Kyle Ellrott  wrote:

> I changed the
>
> curGraph = curGraph.outerJoinVertices(curMessages)(
>   (vid, vertex, message) =>
> vertex.process(message.getOrElse(List[Message]()), ti)
> ).cache()
>
> to
>
> curGraph = curGraph.outerJoinVertices(curMessages)(
>   (vid, vertex, message) => (vertex,
> message.getOrElse(List[Message]()))
> ).mapVertices( (x,y) => y._1.process( y._2, ti ) ).cache()
>
> So the call to the 'process' method was moved out of the outerJoinVertices
> and into a separate mapVertices call, and the problem went away. Now,
> 'process' is only called once during the correct cycle.
> So it would appear that outerJoinVertices caches the closure to be
> recalculated if needed again while mapVertices actually caches the
> derived values.
>
> Is this a bug or a feature?
>
> Kyle
>
>
>
> On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott 
> wrote:
>
>> I'm trying to setup a simple iterative message/update problem in GraphX
>> (spark 1.2.0), but I'm running into issues with the caching and
>> re-calculation of data. I'm trying to follow the example found in the
>> Pregel implementation of materializing and cacheing messages and graphs and
>> then unpersisting them after the next cycle has been done.
>> It doesn't seem to be working, because every cycle gets progressively
>> slower and it seems as if more and more of the values are being
>> re-calculated despite my attempts to cache them.
>>
>> The code:
>> ```
>>   var oldMessages : VertexRDD[List[Message]] = null
>>   var oldGraph : Graph[MyVertex, MyEdge ] = null
>>   curGraph = curGraph.mapVertices((x, y) => y.init())
>>   for (i <- 0 to cycle_count) {
>> val curMessages = curGraph.aggregateMessages[List[Message]](x => {
>>   //send messages
>>   .
>> },
>> (x, y) => {
>>//collect messages into lists
>> val out = x ++ y
>> out
>>   }
>> ).cache()
>> curMessages.count()
>> val ti = i
>> oldGraph = curGraph
>> curGraph = curGraph.outerJoinVertices(curMessages)(
>>   (vid, vertex, message) =>
>> vertex.process(message.getOrElse(List[Message]()), ti)
>> ).cache()
>> curGraph.vertices.count()
>> oldGraph.unpersistVertices(blocking = false)
>> oldGraph.edges.unpersist(blocking = false)
>> oldGraph = curGraph
>> if (oldMessages != null ) {
>>   oldMessages.unpersist(blocking=false)
>> }
>> oldMessages = curMessages
>>   }
>> ```
>>
>> The MyVertex.process method takes the list of inc

Re: SQLContext.applySchema strictness

2015-02-15 Thread Michael Armbrust
Applying schema is a pretty low-level operation, and I would expect most
users would use the type safe interfaces.  If you are unsure you can always
run:

import org.apache.spark.sql.execution.debug._
schemaRDD.typeCheck()

and it will tell you if you have made any mistakes.

Michael

On Sat, Feb 14, 2015 at 1:05 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Would it make sense to add an optional validate parameter to applySchema()
> which defaults to False, both to give users the option to check the schema
> immediately and to make the default behavior clearer?
> ​
>
> On Sat Feb 14 2015 at 9:18:59 AM Michael Armbrust 
> wrote:
>
>> Doing runtime type checking is very expensive, so we only do it when
>> necessary (i.e. you perform an operation like adding two columns together)
>>
>> On Sat, Feb 14, 2015 at 2:19 AM, nitin  wrote:
>>
>>> AFAIK, this is the expected behavior. You have to make sure that the
>>> schema
>>> matches the row. It won't give any error when you apply the schema as it
>>> doesn't validate the nature of data.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: Shuffle write increases in spark 1.2

2015-02-15 Thread Aaron Davidson
I think Xuefeng Wu's suggestion is likely correct. This different is more
likely explained by the compression library changing versions than sort vs
hash shuffle (which should not affect output size significantly). Others
have reported that switching to lz4 fixed their issue.

We should document this if this is the case. I wonder if we're asking
Snappy to be super-low-overhead and as a result the new version does a
better job of it (less overhead, less compression).

On Sat, Feb 14, 2015 at 9:32 AM, Peng Cheng  wrote:

> I double check the 1.2 feature list and found out that the new sort-based
> shuffle manager has nothing to do with HashPartitioner :-< Sorry for the
> misinformation.
>
> In another hand. This may explain increase in shuffle spill as a side
> effect
> of the new shuffle manager, let me revert spark.shuffle.manager to hash and
> see if it make things better (or worse, as the benchmark in
> https://issues.apache.org/jira/browse/SPARK-3280 indicates)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


shark queries failed

2015-02-15 Thread Grandl Robert
Hi guys,
I deployed BlinkDB(built atop Shark) and running Spark 0.9. 
I tried to run several TPCDS shark queries taken from 
https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark.
 However, the following exceptions are encountered. Do you have any idea why 
that might happen ? 

Thanks,Robert

2015-02-14 17:58:29,358 WARN  util.NativeCodeLoader 
(NativeCodeLoader.java:(52)) - Unable to load native-
hadoop library for your platform... using builtin-java classes where applicable
2015-02-14 17:58:29,360 WARN  snappy.LoadSnappy (LoadSnappy.java:(46)) 
- Snappy native library not loaded
2015-02-14 17:58:34,963 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2)
2015-02-14 17:58:34,970 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Loss was due to java.lang
.ClassCastException
java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast 
to org.apache.hadoop.io.FloatWrita
ble
    at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat
ObjectInspector.java:35)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204)
    at 
shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
a:188)
    at 
shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
a:153)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
2015-02-14 17:58:34,983 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4)
2015-02-14 17:58:35,075 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8)
2015-02-14 17:58:35,119 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2)
2015-02-14 17:58:35,134 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5)
2015-02-14 17:58:35,187 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4)
2015-02-14 17:58:35,203 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7)
2015-02-14 17:58:35,214 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9)
2015-02-14 17:58:35,265 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0)
2015-02-14 17:58:35,274 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2)
2015-02-14 17:58:35,304 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8)
2015-02-14 17:58:35,330 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1)
2015-02-14 17:58:35,354 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4)
2015-02-14 17:58:35,387 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5)
2015-02-14 17:58:35,430 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3)
2015-02-14 17:58:35,432 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2)
2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager 
(Logging.scala:logError(65)) - Task 5.0:2 failed 4 times; 
aborting job
2015-02-14 17:58:35,438 ERROR ql.Driver (SessionState.java:printError(400)) - 
FAILED: Execution Error, return cod
e -101 from shark.execution.SparkTask
2015-02-14 17:58:35,552 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 30 (task 6.0:0)
2015-02-14 17:58:35,565 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Loss was due to java.io.F
ileNotFoundException
java.io.FileNotFoundException: http://10.200.146.12:46812/broadcast_4
    at 
sun.net.www.protoc

Re: Shuffle write increases in spark 1.2

2015-02-15 Thread Ami Khandeshi
I have seen same behavior!  I would love to hear an update on this...

Thanks,

Ami

On Thu, Feb 5, 2015 at 8:26 AM, Anubhav Srivastav <
anubhav.srivas...@gmail.com> wrote:

> Hi Kevin,
> We seem to be facing the same problem as well. Were you able to find
> anything after that? The ticket does not seem to have progressed anywhere.
>
> Regards,
> Anubhav
>
> On 5 January 2015 at 10:37, 정재부  wrote:
>
>>  Sure, here is a ticket. https://issues.apache.org/jira/browse/SPARK-5081
>>
>>
>>
>> --- *Original Message* ---
>>
>> *Sender* : Josh Rosen
>>
>> *Date* : 2015-01-05 06:14 (GMT+09:00)
>>
>> *Title* : Re: Shuffle write increases in spark 1.2
>>
>>
>> If you have a small reproduction for this issue, can you open a ticket at
>> https://issues.apache.org/jira/browse/SPARK ?
>>
>>
>>
>> On December 29, 2014 at 7:10:02 PM, Kevin Jung (itsjb.j...@samsung.com)
>> wrote:
>>
>>  Hi all,
>> The size of shuffle write showing in spark web UI is mush different when
>> I
>> execute same spark job on same input data(100GB) in both spark 1.1 and
>> spark
>> 1.2.
>> At the same sortBy stage, the size of shuffle write is 39.7GB in spark
>> 1.1
>> but 91.0GB in spark 1.2.
>> I set spark.shuffle.manager option to hash because it's default value is
>> changed but spark 1.2 writes larger file than spark 1.1.
>> Can anyone tell me why this happened?
>>
>> Thanks
>> Kevin
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>> - To
>> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>
>
>


Array in broadcast can't be serialized

2015-02-15 Thread Tao Xiao
I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized
even when I registered both of them in Kryo.

The code is as follows:

   val conf = new SparkConf()
.setAppName("Hello Spark")
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.MyKryoRegistrator")

val sc = new SparkContext(conf)

val rdd = sc.parallelize(List(
(new ImmutableBytesWritable(Bytes.toBytes("AAA")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("BBB")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("CCC")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("DDD")), new
KeyValue())), 4)

// snippet 1:  a single object of *ImmutableBytesWritable* can be
serialized in broadcast
val partitioner = new SingleElementPartitioner(sc.broadcast(new
ImmutableBytesWritable(Bytes.toBytes(3
val ret = rdd.aggregateByKey(List[KeyValue](),
partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
println("\n\n\ret.count = " + ret.count + ",  partition size = " +
ret.partitions.size)

// snippet 2: an array of *ImmutableBytesWritable* can not be
serialized in broadcast
val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
ImmutableBytesWritable(Bytes.toBytes(2)), new
ImmutableBytesWritable(Bytes.toBytes(3)))
val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
val ret1 = rdd.aggregateByKey(List[KeyValue](),
newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
println("\n\n\nrdd2.count = " + ret1.count)

sc.stop


  // the following are kryo registrator and partitioners
   class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
 kryo.register(classOf[ImmutableBytesWritable])   //
register ImmutableBytesWritable
 kryo.register(classOf[Array[ImmutableBytesWritable]])
 // register
Array[ImmutableBytesWritable]
}
   }

   class SingleElementPartitioner(bc:
Broadcast[ImmutableBytesWritable]) extends Partitioner {
override def numPartitions: Int = 5
def v = Bytes.toInt(bc.value.get)
override def getPartition(key: Any): Int =  v - 1
   }


class ArrayPartitioner(bc:
Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
val arr = bc.value
override def numPartitions: Int = arr.length
override def getPartition(key: Any): Int =
Bytes.toInt(arr(0).get)
}



In the code above, snippet 1 can work as expected. But snippet 2 throws
"Task not serializable: java.io.NotSerializableException:
org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .


So do I have to implement a Kryo serializer for Array[T] if it is used in
broadcast ?

Thanks


Re: New ColumnType For Decimal Caching

2015-02-15 Thread Michael Armbrust
That sound right to me.  Cheng could elaborate if you are missing something.

On Fri, Feb 13, 2015 at 11:36 AM, Manoj Samel 
wrote:

> Thanks Michael for the pointer & Sorry for the delayed reply.
>
> Taking a quick inventory of scope of change - Is the column type for
> Decimal caching needed only in the caching layer (4 files
> in org.apache.spark.sql.columnar - ColumnAccessor.scala,
> ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala)
>
> Or do other SQL components also need to be touched ?
>
> Hoping for a quick feedback of top of your head ...
>
> Thanks,
>
>
>
> On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust 
> wrote:
>
>> You could add a new ColumnType
>> 
>> .
>>
>> PRs welcome :)
>>
>> On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel 
>> wrote:
>>
>>> Hi Michael,
>>>
>>> As a test, I have same data loaded as another parquet - except with the
>>> 2 decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
>>> the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
>>> time of uncached query.
>>>
>>> Would it be possible for Spark to store in-memory decimal in some form
>>> of long with decoration ?
>>>
>>> For the immediate future, is there any hook that we can use to provide
>>> custom caching / processing for the decimal type in RDD so other semantic
>>> does not changes ?
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>> On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel 
>>> wrote:
>>>
 Could you share which data types are optimized in the in-memory storage
 and how are they optimized ?

 On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> You'll probably only get good compression for strings when dictionary
> encoding works.  We don't optimize decimals in the in-memory columnar
> storage, so you are paying expensive serialization there likely.
>
> On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel 
> wrote:
>
>> Flat data of types String, Int and couple of decimal(14,4)
>>
>> On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Is this nested data or flat data?
>>>
>>> On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel <
>>> manojsamelt...@gmail.com> wrote:
>>>
 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10
 partitions) with zero disk usage. Tasks for subsequent select on this 
 table
 in cache shows minimal overheads (GC, queueing, shuffle write etc. 
 etc.),
 so overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer =
 org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB.
 For mapPartition phase of query on uncached table shows input size of 
 351
 MB. However, after the table is cached, the storage shows the cache 
 size as
 12GB. So the in-memory representation seems much bigger than on-disk, 
 even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of
 12GB (full size of cache table) and takes twice the time as 
 mapPartition
 for uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> Check the storage tab.  Does the table actually fit in memory?
> Otherwise you are rebuilding column buffers in addition to reading 
> the data
> off of the disk.
>
> On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel <
> manojsamelt...@gmail.com> wrote:
>
>> Spark 1.2
>>
>> Data stored in parquet table (large number of rows)
>>
>> Test 1
>>
>> select a, sum(b), sum(c) from table
>>
>> Test
>>
>> sqlContext.cacheTable()
>> select a, sum(b), sum(c) from table  - "seed cache" First time
>> slow since loading cache ?
>> select a, sum(b), sum(c) from table  - Second time it should be
>> faster as it should be reading from cache, not HDFS. But it is 
>> slower than
>> test1
>>
>> Any thoughts? Should a different query be used to seed cache ?
>>
>> Thanks,
>>
>>
>

>>>
>>
>

>>>
>>
>


Specifying AMI when using Spark EC-2 scripts

2015-02-15 Thread olegshirokikh
Hi there,

Is there a way to specify the AWS AMI with particular OS (say Ubuntu) when
launching Spark on Amazon cloud with provided scripts?

What is the default AMI, operating system that is launched by EC-2 script?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-AMI-when-using-Spark-EC-2-scripts-tp21658.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Extract hour from Timestamp in Spark SQL

2015-02-15 Thread Cheng, Hao
Are you using the SQLContext? I think the HiveContext is recommended.

Cheng Hao

From: Wush Wu [mailto:w...@bridgewell.com]
Sent: Thursday, February 12, 2015 2:24 PM
To: u...@spark.incubator.apache.org
Subject: Extract hour from Timestamp in Spark SQL

Dear all,

I am new to Spark SQL and have no experience of Hive.
I tried to use the built-in Hive Function to extract the hour from timestamp in 
spark sql, but got : "java.util.NoSuchElementException: key not found: hour"
How should I extract the hour from timestamp?
And I am very confusing about which functions I could use in Spark SQL. Is 
there any list of available functions except  
http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#compatibility-with-apache-hive
 ?
Thanks,
Wush




Re: Specifying AMI when using Spark EC-2 scripts

2015-02-15 Thread gen tang
Hi,

You can use -a or --ami  to launch the cluster using specific
ami.
If I remember well, the default system is Amazon Linux.

Hope it will help

Cheers
Gen


On Sun, Feb 15, 2015 at 6:20 AM, olegshirokikh  wrote:

> Hi there,
>
> Is there a way to specify the AWS AMI with particular OS (say Ubuntu) when
> launching Spark on Amazon cloud with provided scripts?
>
> What is the default AMI, operating system that is launched by EC-2 script?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-AMI-when-using-Spark-EC-2-scripts-tp21658.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Array in broadcast can't be serialized

2015-02-15 Thread Ted Yu
I was looking at https://github.com/twitter/chill

It seems this would achieve what you want:
chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala

Cheers

On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao  wrote:

> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
> serialized even when I registered both of them in Kryo.
>
> The code is as follows:
>
>val conf = new SparkConf()
> .setAppName("Hello Spark")
> .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>
> val sc = new SparkContext(conf)
>
> val rdd = sc.parallelize(List(
> (new ImmutableBytesWritable(Bytes.toBytes("AAA")), new
> KeyValue()),
> (new ImmutableBytesWritable(Bytes.toBytes("BBB")), new
> KeyValue()),
> (new ImmutableBytesWritable(Bytes.toBytes("CCC")), new
> KeyValue()),
> (new ImmutableBytesWritable(Bytes.toBytes("DDD")), new
> KeyValue())), 4)
>
> // snippet 1:  a single object of *ImmutableBytesWritable* can be
> serialized in broadcast
> val partitioner = new SingleElementPartitioner(sc.broadcast(new
> ImmutableBytesWritable(Bytes.toBytes(3
> val ret = rdd.aggregateByKey(List[KeyValue](),
> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
> println("\n\n\ret.count = " + ret.count + ",  partition size = " +
> ret.partitions.size)
>
> // snippet 2: an array of *ImmutableBytesWritable* can not be
> serialized in broadcast
> val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
> ImmutableBytesWritable(Bytes.toBytes(2)), new
> ImmutableBytesWritable(Bytes.toBytes(3)))
> val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
> val ret1 = rdd.aggregateByKey(List[KeyValue](),
> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
> println("\n\n\nrdd2.count = " + ret1.count)
>
> sc.stop
>
>
>   // the following are kryo registrator and partitioners
>class MyKryoRegistrator extends KryoRegistrator {
> override def registerClasses(kryo: Kryo): Unit = {
>  kryo.register(classOf[ImmutableBytesWritable])   //
> register ImmutableBytesWritable
>  kryo.register(classOf[Array[ImmutableBytesWritable]])  // 
> register
> Array[ImmutableBytesWritable]
> }
>}
>
>class SingleElementPartitioner(bc:
> Broadcast[ImmutableBytesWritable]) extends Partitioner {
> override def numPartitions: Int = 5
> def v = Bytes.toInt(bc.value.get)
> override def getPartition(key: Any): Int =  v - 1
>}
>
>
> class ArrayPartitioner(bc:
> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
> val arr = bc.value
> override def numPartitions: Int = arr.length
> override def getPartition(key: Any): Int =
> Bytes.toInt(arr(0).get)
> }
>
>
>
> In the code above, snippet 1 can work as expected. But snippet 2 throws
> "Task not serializable: java.io.NotSerializableException:
> org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .
>
>
> So do I have to implement a Kryo serializer for Array[T] if it is used in
> broadcast ?
>
> Thanks
>
>
>
>
>


Inconsistent execution times for same application.

2015-02-15 Thread Kartheek.R
Hi,
My spark cluster contains machines like Pentium-4, dual core and quad-core
machines. I am trying to run a character frequency count application. The
application contains several threads, each submitting a job(action) that
counts the frequency of a single character. But, my problem is, I get
different execution times each time I run the same application with same
data (1G text data). Sometimes the difference is as huge as 10-15 mins. I
think, this pertains to scheduling when the cluster is heterogeneous in
nature. Can someone please tell me how tackle this issue?. I need to get
consistent results. Any suggestions please!!

I cache() the rdd. Total 7 slave nodes. Executor memory=2500m.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-execution-times-for-same-application-tp21662.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Percentile example

2015-02-15 Thread SiMaYunRui
hello, 
I am a newbie to spark and trying to figure out how to get percentile against a 
big data set. Actually, I googled this topic but not find any very useful code 
example and explanation. Seems that I can use transformer SortBykey to get my 
data set in order, but not pretty sure how can I get value of , for example, 
percentile 66. 
Should I use take() to pick up the value of percentile 66? I don't believe any 
machine can load my data set in memory. I believe there must be more efficient 
approaches. 
Can anyone shed some light on this problem? 
Regards
  

Multidimensional K-Means

2015-02-15 Thread Attila Tóth
Dear Spark User List,

I'm fairly new to Spark, trying to use it for multi-dimensional clustering
(using the k-means clustering from MLib). However, based on the examples
the clustering seems to work only for a single dimension (KMeans.train()
accepts an RDD[Vector], which is a vector of doubles - I have a list of
array of doubles, eg. a list of n-dimensional coordinates).

Is there any way with which, given a list of arrays (or vectors) of
doubles, I can get out the list of cluster centres (as a list of
n-dimensional coordinates) in Spark?

I'm using Scala.

Thanks in advance,
Attila


Re: Multidimensional K-Means

2015-02-15 Thread Sean Owen
Clustering operates on a large number of n-dimensional vectors. That
seems to be what you are describing, and that is what the MLlib API
accepts. What are you expecting that you don't find?

Did you have a look at the KMeansModel that this method returns? it
has a "clusterCenters" method that gives you what you're looking for.
Explore the API a bit more first.

On Sun, Feb 15, 2015 at 4:26 PM, Attila Tóth  wrote:
> Dear Spark User List,
>
> I'm fairly new to Spark, trying to use it for multi-dimensional clustering
> (using the k-means clustering from MLib). However, based on the examples the
> clustering seems to work only for a single dimension (KMeans.train() accepts
> an RDD[Vector], which is a vector of doubles - I have a list of array of
> doubles, eg. a list of n-dimensional coordinates).
>
> Is there any way with which, given a list of arrays (or vectors) of doubles,
> I can get out the list of cluster centres (as a list of n-dimensional
> coordinates) in Spark?
>
> I'm using Scala.
>
> Thanks in advance,
> Attila

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Dynamic partition pattern support

2015-02-15 Thread Jianshi Huang
Hi,

HCatalog allows you to specify the pattern of paths for partitions, which
will be used by dynamic partition loading.


https://cwiki.apache.org/confluence/display/Hive/HCatalog+DynamicPartitions#HCatalogDynamicPartitions-ExternalTables

Can we have similar feature in SparkSQL?

Jira is here: https://issues.apache.org/jira/browse/SPARK-5828

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: shark queries failed

2015-02-15 Thread Akhil Das
I'd suggest you updating your spark to the latest version and try SparkSQL
instead of Shark.

Thanks
Best Regards

On Sun, Feb 15, 2015 at 7:36 AM, Grandl Robert 
wrote:

> Hi guys,
>
> I deployed BlinkDB(built atop Shark) and running Spark 0.9.
>
> I tried to run several TPCDS shark queries taken from
> https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark.
> However, the following exceptions are encountered. Do you have any idea why
> that might happen ?
>
> Thanks,
> Robert
>
> 2015-02-14 17:58:29,358 WARN  util.NativeCodeLoader
> (NativeCodeLoader.java:(52)) - Unable to load native-
> hadoop library for your platform... using builtin-java classes where
> applicable
> 2015-02-14 17:58:29,360 WARN  snappy.LoadSnappy
> (LoadSnappy.java:(46)) - Snappy native library not loaded
> 2015-02-14 17:58:34,963 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2)
> 2015-02-14 17:58:34,970 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Loss was due to java.lang
> .ClassCastException
> java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be
> cast to org.apache.hadoop.io.FloatWrita
> ble
> at
> org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat
> ObjectInspector.java:35)
> at
> org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331)
> at
> org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257)
> at
> org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204)
> at
> shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
> a:188)
> at
> shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
> a:153)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> 2015-02-14 17:58:34,983 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4)
> 2015-02-14 17:58:35,075 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8)
> 2015-02-14 17:58:35,119 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2)
> 2015-02-14 17:58:35,134 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5)
> 2015-02-14 17:58:35,187 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4)
> 2015-02-14 17:58:35,203 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7)
> 2015-02-14 17:58:35,214 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9)
> 2015-02-14 17:58:35,265 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0)
> 2015-02-14 17:58:35,274 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2)
> 2015-02-14 17:58:35,304 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8)
> 2015-02-14 17:58:35,330 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1)
> 2015-02-14 17:58:35,354 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4)
> 2015-02-14 17:58:35,387 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5)
> 2015-02-14 17:58:35,430 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3)
> 2015-02-14 17:58:35,432 WARN  scheduler.TaskSetManager
> (Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2)
> 2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager
> (Logging.scala:logError(65)) - Task 5.0:2 failed 4 times;
> aborting job
> 2015-02-14 17:58:35,438 ERROR ql.Driver
> (SessionState.java:printError(400)) - FAILED: Execution Error, return cod
> e -101 from shark.execution.SparkTask
> 2015-02-14 17:58:35,552 WA

Re: SparkStreaming Low Performance

2015-02-15 Thread Akhil Das
Thanks Enno, let me have a look at Stream Parser version of Jackson.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 9:30 PM, Enno Shioji  wrote:

> Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
> if your JSON file is big enough, I guess you could get that sort of
> processing time.
>
> Jackson is more or less the most efficient JSON parser out there, so
> unless the Scala API is somehow affecting it, I don't see any better way.
> If you only need to read parts of the JSON, you could look into exploiting
> Jackson's stream parsing API
> .
>
> I guess the good news is you can throw machines at it. You could also look
> into other serialization frameworks.
>
>
>
> ᐧ
>
> On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das 
> wrote:
>
>> Thanks again!
>> Its with the parser only, just tried the parser
>>  without Spark. And
>> it took me 52 Sec to process 8k json records. Not sure if there's an
>> efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
>> and all it will be much faster, but i need that in SparkStreaming.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji  wrote:
>>
>>> I see. I'd really benchmark how the parsing performs outside Spark (in a
>>> tight loop or something). If *that* is slow, you know it's the parsing. If
>>> not, it's not the parsing.
>>>
>>> Another thing you want to look at is CPU usage. If the actual parsing
>>> really is the bottleneck, you should see very high CPU utilization. If not,
>>> it's not the parsing per se but rather the ability to feed the messages to
>>> the parsing library.
>>>
>>>
>>> ᐧ
>>>
>>> On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das 
>>> wrote:
>>>
 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das 
 wrote:

> Thanks for the suggestion, but doing that gives me this exception:
>
> http://pastebin.com/ni80NqKn
>
> Over this piece of code:
>
>object Holder extends Serializable {
>   @transient lazy val mapper = new ObjectMapper() with
> ScalaObjectMapper
>   mapper.registerModule(DefaultScalaModule)
> }
>
> val jsonStream = myDStream.map(x=> {
>Holder.mapper.readValue[Map[String,Any]](x)
> })
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji 
> wrote:
>
>> (adding back user)
>>
>> Fair enough. Regarding serialization exception, the hack I use is to
>> have a object with a transient lazy field, like so:
>>
>>
>> object Holder extends Serializable {
>>   @transient lazy val mapper = new ObjectMapper()
>> }
>>
>> This way, the ObjectMapper will be instantiated at the destination
>> and you can share the instance.
>>
>>
>>
>> ᐧ
>>
>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> Thanks for the reply Enno, in my case rate from the stream is not
>>> the bottleneck as i'm able to consume all those records at a time (have
>>> tested it). And regarding the ObjectMapper, if i take it outside of my 
>>> map
>>> operation then it throws Serializable Exceptions (Caused by:
>>> java.io.NotSerializableException:
>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji 
>>> wrote:
>>>
 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the
 ObjectMapper every time. This is quite expensive and jackson 
 recommends you
 to share the instance. However, if you tried other parsers / 
 mapPartitions
 without success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <
 ak...@sigmoidanalytics.com> wrote:

> I'm getting a low performance while parsing json data. My cluster
> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
> memory
> and 4 cores.
>
> I tried both scala.util.parsing.json.JSON and and fasterxml's
> Jackson parser.
>
> This is what i basically do:
>
> *//Approach 1:*
> val jsonStream = myDStream.map(x=> {
>   val mapper = new ObjectMapper() with Sc

Loading tables using parquetFile vs. loading tables from Hive metastore with Parquet serde

2015-02-15 Thread Jianshi Huang
Hi,

If I have a table in Hive metastore saved as Parquet, and I want to use it
in Spark. It seems Spark will use Hive's Parquet serde to load the actual
data.

So is there any difference here? Will predicate pushdown, pruning and
future Parquet optimizations in SparkSQL work for using Hive serde?

Loading tables using parquetFile vs. loading tables from Hive metastore
with Parquet serde


Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


spark-local dir running out of space during long ALS run

2015-02-15 Thread Antony Mayi
Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

Re: Multidimensional K-Means

2015-02-15 Thread Attila Tóth
Hi Sean,

Thanks for the quick answer. I have not realized that I can make an
RDD[Vector] with eg.

val dataSet = sparkContext.makeRDD(List(Vectors.dense(10.0,20.0),
Vectors.dense(20.0,30.0)))

Using this KMeans.train works as it should.

So my bad. Thanks again!

Attila

2015-02-15 17:29 GMT+01:00 Sean Owen :

> Clustering operates on a large number of n-dimensional vectors. That
> seems to be what you are describing, and that is what the MLlib API
> accepts. What are you expecting that you don't find?
>
> Did you have a look at the KMeansModel that this method returns? it
> has a "clusterCenters" method that gives you what you're looking for.
> Explore the API a bit more first.
>
> On Sun, Feb 15, 2015 at 4:26 PM, Attila Tóth  wrote:
> > Dear Spark User List,
> >
> > I'm fairly new to Spark, trying to use it for multi-dimensional
> clustering
> > (using the k-means clustering from MLib). However, based on the examples
> the
> > clustering seems to work only for a single dimension (KMeans.train()
> accepts
> > an RDD[Vector], which is a vector of doubles - I have a list of array of
> > doubles, eg. a list of n-dimensional coordinates).
> >
> > Is there any way with which, given a list of arrays (or vectors) of
> doubles,
> > I can get out the list of cluster centres (as a list of n-dimensional
> > coordinates) in Spark?
> >
> > I'm using Scala.
> >
> > Thanks in advance,
> > Attila
>


Re: Unable to query hive tables from spark

2015-02-15 Thread Todd Nist
What does your hive-site.xml look like?  Do you actually have a directory
at the location shown in the error?  i.e does "/user/hive/warehouse/src"
exist?  You should be able to override this by specifying the following:

--hiveconf
hive.metastore.warehouse.dir=/location/where/your/warehouse/exists

HTH.

-Todd

On Thu, Feb 12, 2015 at 1:16 AM, kundan kumar  wrote:

> I want to create/access the hive tables from spark.
>
> I have placed the hive-site.xml inside the spark/conf directory. Even
> though it creates a local metastore in the directory where I run the spark
> shell and exists with an error.
>
> I am getting this error when I try to create a new hive table. Even on
> querying a existing table error appears.
>
> sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>
> Please suggest what wrong I am doing and a way to resolve this.
>
> 15/02/12 10:35:58 ERROR RetryingHMSHandler: 
> MetaException(message:file:/user/hive/warehouse/src is not a directory or 
> unable to create one)
> at 
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
> at 
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
>
>


Re: shark queries failed

2015-02-15 Thread Grandl Robert
Thanks for reply, Akhil. I cannot update the spark version and run SparkSQL due 
to some old dependencies and a specific project I want to run. 

I was wondering if you have any clue, why that exception might be triggered, or 
if you saw it before. 

Thanks,Robert
 

 On Sunday, February 15, 2015 9:18 AM, Akhil Das 
 wrote:
   

 I'd suggest you updating your spark to the latest version and try SparkSQL 
instead of Shark.
ThanksBest Regards
On Sun, Feb 15, 2015 at 7:36 AM, Grandl Robert  
wrote:

Hi guys,
I deployed BlinkDB(built atop Shark) and running Spark 0.9. 
I tried to run several TPCDS shark queries taken from 
https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark.
 However, the following exceptions are encountered. Do you have any idea why 
that might happen ? 

Thanks,Robert

2015-02-14 17:58:29,358 WARN  util.NativeCodeLoader 
(NativeCodeLoader.java:(52)) - Unable to load native-
hadoop library for your platform... using builtin-java classes where applicable
2015-02-14 17:58:29,360 WARN  snappy.LoadSnappy (LoadSnappy.java:(46)) 
- Snappy native library not loaded
2015-02-14 17:58:34,963 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2)
2015-02-14 17:58:34,970 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Loss was due to java.lang
.ClassCastException
java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast 
to org.apache.hadoop.io.FloatWrita
ble
    at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat
ObjectInspector.java:35)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204)
    at 
shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
a:188)
    at 
shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
a:153)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
2015-02-14 17:58:34,983 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4)
2015-02-14 17:58:35,075 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8)
2015-02-14 17:58:35,119 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2)
2015-02-14 17:58:35,134 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5)
2015-02-14 17:58:35,187 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4)
2015-02-14 17:58:35,203 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7)
2015-02-14 17:58:35,214 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9)
2015-02-14 17:58:35,265 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0)
2015-02-14 17:58:35,274 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2)
2015-02-14 17:58:35,304 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8)
2015-02-14 17:58:35,330 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1)
2015-02-14 17:58:35,354 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4)
2015-02-14 17:58:35,387 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5)
2015-02-14 17:58:35,430 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3)
2015-02-14 17:58:35,432 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2)
2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager 
(Logging.scala:logError(65)) - Task 5.0:2 failed 4 times; 
aborting j

Re: spark-local dir running out of space during long ALS run

2015-02-15 Thread Antony Mayi
spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi  
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 
   

monit with spark

2015-02-15 Thread Mike Sam
We want to monitor spark master and spark slaves using monit but we want to
use the sbin scripts to do so. The scripts create the spark master and
salve processes independent from themselves so monit would not know the
started processed pid to watch. Is this correct? Should we watch the ports?

How should we configure monit to run and monitor spark standalone
processes?

-- 
Thanks,
Mike


Loading JSON dataset with Spark Mllib

2015-02-15 Thread pankaj channe
Hi,

I am new to spark and planning on writing a machine learning application
with Spark mllib. My dataset is in json format. Is it possible to load data
into spark without using any external json libraries? I have explored the
option of SparkSql but I believe that is only for interactive use or
loading data into hive tables.

Thanks,
Pankaj


Re: Loading JSON dataset with Spark Mllib

2015-02-15 Thread gen tang
Hi,

In fact, you can use sqlCtx.jsonFile() which loads a text file storing one
JSON object per line as a SchemaRDD.
Or you can use sc.textFile() to load the textFile to RDD and then use
sqlCtx.jsonRDD() which loads an RDD storing one JSON object per string as a
SchemaRDD.

Hope it could help
Cheers
Gen


On Mon, Feb 16, 2015 at 12:39 AM, pankaj channe 
wrote:

> Hi,
>
> I am new to spark and planning on writing a machine learning application
> with Spark mllib. My dataset is in json format. Is it possible to load data
> into spark without using any external json libraries? I have explored the
> option of SparkSql but I believe that is only for interactive use or
> loading data into hive tables.
>
> Thanks,
> Pankaj
>


Re: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0

2015-02-15 Thread matroyd
It works now using 1.2.1. Thanks for all the help. Spark rocks !!



-
Thanks,
Roy
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-HiveContext-created-SchemaRDD-s-saveAsTable-is-not-working-on-1-2-0-tp21442p21664.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing to HDFS from spark Streaming

2015-02-15 Thread Bahubali Jain
I used the latest assembly jar and the below as suggested by Akhil to fix
this problem...
temp.saveAsHadoopFiles("DailyCSV",".txt", String.class, String.class,
*(Class)* TextOutputFormat.class);

Thanks All for the help !

On Wed, Feb 11, 2015 at 1:38 PM, Sean Owen  wrote:

> That kinda dodges the problem by ignoring generic types. But it may be
> simpler than the 'real' solution, which is a bit ugly.
>
> (But first, to double check, are you importing the correct
> TextOutputFormat? there are two versions. You use .mapred. with the
> old API and .mapreduce. with the new API.)
>
> Here's how I've formally casted around it in similar code:
>
> @SuppressWarnings
> Class> outputFormatClass =
> (Class>) (Class) TextOutputFormat.class;
>
> and then pass that as the final argument.
>
> On Wed, Feb 11, 2015 at 6:35 AM, Akhil Das 
> wrote:
> > Did you try :
> >
> > temp.saveAsHadoopFiles("DailyCSV",".txt", String.class,
> String.class,(Class)
> > TextOutputFormat.class);
> >
> > Thanks
> > Best Regards
> >
> > On Wed, Feb 11, 2015 at 9:40 AM, Bahubali Jain 
> wrote:
> >>
> >> Hi,
> >> I am facing issues while writing data from a streaming rdd to hdfs..
> >>
> >> JavaPairDstream temp;
> >> ...
> >> ...
> >> temp.saveAsHadoopFiles("DailyCSV",".txt", String.class,
> >> String.class,TextOutputFormat.class);
> >>
> >>
> >> I see compilation issues as below...
> >> The method saveAsHadoopFiles(String, String, Class, Class, Class >> extends OutputFormat>) in the type JavaPairDStream
> is
> >> not applicable for the arguments (String, String, Class,
> >> Class, Class)
> >>
> >> I see same kind of problem even with saveAsNewAPIHadoopFiles API .
> >>
> >> Thanks,
> >> Baahu
> >
> >
>



-- 
Twitter:http://twitter.com/Baahu


WARN from Similarity Calculation

2015-02-15 Thread Debasish Das
Hi,

I am sometimes getting WARN from running Similarity calculation:

15/02/15 23:07:55 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(7, abc.com, 48419, 0) with no recent heart beats: 66435ms
exceeds 45000ms

Do I need to increase the default 45 s to larger values for cases where we
are doing blocked operation or long compute in the mapPartitions ?

Thanks.
Deb