Support for time column type?

2016-04-01 Thread Philip Weaver
Hi, I don't see any mention of a time type in the documentation (there is DateType and TimestampType, but not TimeType), and have been unable to find any documentation about whether this will be supported in the future. Does anyone know if this is currently supported or will be supported in the fut

Re: Support for time column type?

2016-04-04 Thread Philip Weaver
eplacement (i.e. if all the same operators are defined for it), but I'll give it a try. - Philip On Fri, Apr 1, 2016 at 1:33 PM, Michael Armbrust wrote: > There is also CalendarIntervalType. Is that what you are looking for? > > On Fri, Apr 1, 2016 at 1:11 PM, Philip Weaver >

Sanely updating parquet partitions.

2016-04-29 Thread Philip Weaver
Hello, I have a parquet dataset, partitioned by a column 'a'. I want to take advantage of Spark SQL's ability to filter to the partition when you filter on 'a'. I also want to periodically update individual partitions without disrupting any jobs that are querying the data. The obvious solution wa

Re: Sanely updating parquet partitions.

2016-04-29 Thread Philip Weaver
;)).parquet("/path/to/table/a=*") I also checked that the symlinks were followed the way I wanted, by removing one of the symlinks after creating the DataFrame, and I was able to query the DataFrame without error. - Philip On Fri, Apr 29, 2016 at 9:56 AM, Philip Weaver wrote:

Metadata in Parquet

2015-09-30 Thread Philip Weaver
Hi, I am using org.apache.spark.sql.types.Metadata to store extra information along with each of my fields. I'd also like to store Metadata for the entire DataFrame, not attached to any specific field. Is this supported? - Philip

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-02 Thread Philip Weaver
ss the many jobs you > might have - as opposed to fewer, longer tasks... > > Lastly, 8 cores is not that much horsepower :) > You may consider running with beefier machines or a larger cluster, to get > at least tens of cores. > > Hope this helps, > -adrian > > Sent fro

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
t you are sharing the machine across many jobs. > That was the context in which I was making that comment. > > -adrian > > Sent from my iPhone > > On 03 Oct 2015, at 07:03, Philip Weaver wrote: > > You can't really say 8 cores is not much horsepower when you have no

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
t that much horsepower :) >> You may consider running with beefier machines or a larger cluster, to >> get at least tens of cores. >> >> Hope this helps, >> -adrian >> >> Sent from my iPhone >> >> On 18 Sep 2015, at 18:37, Philip Weaver wrote:

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
mentioned above, the jobs don't scale beyond about 8 cores. 2.) The next submitted job will have to wait for resources to become available. - Philip On Sun, Oct 4, 2015 at 2:33 PM, Philip Weaver wrote: > I believe I've described my use case clearly, and I'm being questioned

Location preferences in pyspark?

2015-10-16 Thread Philip Weaver
I believe what I want is the exact functionality provided by SparkContext.makeRDD in Scala. For each element in the RDD, I want specify a list of preferred hosts for processing that element. It looks like this method only exists in Scala, and as far as I can tell there is no similar functionality

Re: Location preferences in pyspark?

2015-10-20 Thread Philip Weaver
, 2015 at 8:42 AM, Philip Weaver > wrote: > >> I believe what I want is the exact functionality provided by >> SparkContext.makeRDD in Scala. For each element in the RDD, I want specify >> a list of preferred hosts for processing that element. >> >> It looks like this

Unable to compete with performance of single-threaded Scala application

2015-08-03 Thread Philip Weaver
Hello, I am running Spark 1.4.0 on Mesos 0.22.1, and usually I run my jobs in coarse-grained mode. I have written some single-threaded standalone Scala applications for a problem that I am working on, and I am unable to get a Spark solution that comes close to the performance of this application.

Safe to write to parquet at the same time?

2015-08-03 Thread Philip Weaver
I think this question applies regardless if I have two completely separate Spark jobs or tasks on different machines, or two cores that are part of the same task on the same machine. If two jobs/tasks/cores/stages both save to the same parquet directory in parallel like this: df1.write.mode(SaveM

Re: Turn Off Compression for Textfiles

2015-08-04 Thread Philip Weaver
The .gz extension indicates that the file is compressed with gzip. Choose a different extension (e.g. .txt) when you save them. On Tue, Aug 4, 2015 at 7:00 PM, Brandon White wrote: > How do you turn off gz compression for saving as textfiles? Right now, I > am reading ,gz files and it is saving

Re: spark hangs at broadcasting during a filter

2015-08-05 Thread Philip Weaver
How big is droprows? Try explicitly broadcasting it like this: val broadcastDropRows = sc.broadcast(dropRows) val valsrows = ... .filter(x => !broadcastDropRows.value.contains(x._1)) - Philip On Wed, Aug 5, 2015 at 11:54 AM, AlexG wrote: > I'm trying to load a 1 Tb file whose lines i,j,

Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy("a", "b").parquet("asdf") There are 35 values of "a", and about 1100-1200 values of "b" for each value of "a", for a total of over 40,000 partitions. Before running any transformations

Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏

Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
at map at > :61 java.lang.UnsupportedOperationException: Schema for type > java.util.Date is not supported at > org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) > at > org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.s

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
g turned on. > > Cheng > > > On 8/6/15 8:26 AM, Philip Weaver wrote: > > I have a parquet directory that was produced by partitioning by two keys, > e.g. like this: > > df.write.partitionBy("a", "b").parquet("asdf") > > > There are

Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:163) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:161) > at scala.collection.Iterato

Re: Unable to persist RDD to HDFS

2015-08-05 Thread Philip Weaver
This isn't really a Spark question. You're trying to parse a string to an integer, but it contains an invalid character. The exception message explains this. On Wed, Aug 5, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: > Code: > import java.text.SimpleDateFormat > import java.util.Calendar > import jav

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The initialization time is about 1 minute now, which is still pretty terrible. On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver wrote: > Absolutely, thanks! > > On Wed, Aug 5, 2015 at 9:07 PM, Cheng Li

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
be a problem with the parquet reader. I may try to do what he did to construct a DataFrame manually, and see if I can query it with Spark SQL with reasonable performance. - Philip On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian wrote: > Would you mind to provide the driver log? > > &

Re: How to distribute non-serializable object in transform task or broadcast ?

2015-08-07 Thread Philip Weaver
If the object cannot be serialized, then I don't think broadcast will make it magically serializable. You can't transfer data structures between nodes without serializing them somehow. On Fri, Aug 7, 2015 at 7:31 AM, Sujit Pal wrote: > Hi Hao, > > I think sc.broadcast will allow you to broadcast

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-07 Thread Philip Weaver
3 ms on lindevspark4 (7/8) > 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 > (TID 9) in 2886 ms on lindevspark5 (8/8) > > That might be the reason why you observed that the C parquet library you > mentioned (is it parquet-cpp?) is an order of magnitude faster? > &

Re: Spark failed while trying to read parquet files

2015-08-07 Thread Philip Weaver
Yes, NullPointerExceptions are pretty common in Spark (or, rather, I seem to encounter them a lot!) but can occur for a few different reasons. Could you add some more detail, like what the schema is for the data, or the code you're using to read it? On Fri, Aug 7, 2015 at 3:20 PM, Jerrick Hoang w

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-11 Thread Philip Weaver
at 8:23 AM, Philip Weaver wrote: > Thanks, I also confirmed that the partition discovery is slow by writing a > non-Spark application that uses the parquet library directly to load that > partitions. > > It's so slow that my colleague's Python application can read th

grouping by a partitioned key

2015-08-11 Thread Philip Weaver
If I have an RDD that happens to already be partitioned by a key, how efficient can I expect a groupBy operation to be? I would expect that Spark shouldn't have to move data around between nodes, and simply will have a small amount of work just checking the partitions to discover that it doesn't ne

Re: grouping by a partitioned key

2015-08-11 Thread Philip Weaver
rantee that’s the case? > What is it you try to achieve? There might be another way for it, when you > might be 100% sure what’s happening. > > You can print debugString or explain (for DataFrame) to see what’s > happening under the hood. > > > On 12 Aug 2015, at 01:19, Philip We

Re: grouping by a partitioned key

2015-08-12 Thread Philip Weaver
sis. > DataFrame.foreachPartition is the way. > > I haven't tried it, but, following looks like a not-so-sophisticated way > of making spark sql partition aware. > > > http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery > > > On Wed, Aug 12, 2015 at 5:00 A

Re: Write to cassandra...each individual statement

2015-08-13 Thread Philip Weaver
All you'd need to do is *transform* the rdd before writing it, e.g. using the .map function. On Thu, Aug 13, 2015 at 11:30 AM, Priya Ch wrote: > Hi All, > > I have a question in writing rdd to cassandra. Instead of writing entire > rdd to cassandra, i want to write individual statement into ca

Re: Write to cassandra...each individual statement

2015-08-13 Thread Philip Weaver
logic depends on 1st > message. > Hence using rdd.foreach i am handling different logic for individual > messages. Now bulk rdd.saveToCassandra will now work. > > Hope you got what i am trying to say.. > > On Fri, Aug 14, 2015 at 12:07 AM, Philip Weaver > wrote: > >> A

Re: Driver staggering task launch times

2015-08-13 Thread Philip Weaver
Are you running on mesos, yarn or standalone? If you're on mesos, are you using coarse grain or fine grained mode? On Thu, Aug 13, 2015 at 10:13 PM, Ara Vartanian wrote: > I’m observing an unusual situation where my step duration increases as I > add further executors to my cluster. My algorithm

Re: Driver staggering task launch times

2015-08-13 Thread Philip Weaver
Ah, nevermind, I don't know anything about scheduling tasks in YARN. On Thu, Aug 13, 2015 at 11:03 PM, Ara Vartanian wrote: > I’m running on Yarn. > > On Aug 13, 2015, at 10:58 PM, Philip Weaver > wrote: > > Are you running on mesos, yarn or standalone? If you'

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Philip Weaver
I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-20 Thread Philip Weaver
.com] > *Sent:* Thursday, August 20, 2015 1:46 PM > *To:* Cheng, Hao > *Cc:* Philip Weaver; user > *Subject:* Re: Spark Sql behaves strangely with tables with a lot of > partitions > > > > I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of > CLs tr

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Philip Weaver
hour) >>>>> explain select count(*) from test_table where date_prefix='20150819' >>>>> and hour='00'; >>>>> >>>>> TungstenAggregate(key=[], >>>>> value=[(count(1),mode=Final,isDistinct=false)] &g

Limiting number of cores per job in multi-threaded driver.

2015-09-12 Thread Philip Weaver
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR scheduler, so I can define a long-running application capable of executing multiple simultaneous spark jobs. The kind of jobs that I'm running do not benefit from more than 4 cores, but I want my application to be able to tak

Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Philip Weaver
Hello, I am trying to use dynamic allocation which requires the shuffle service. I am running Spark on mesos. Whenever I set spark.shuffle.service.enabled=true, my Spark driver fails with an error like this: Caused by: java.net.ConnectException: Connection refused: devspark1/ 172.26.21.70:7337 I

Re: Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Philip Weaver
k. > > Let me know if that's not clear. > > Tim > > On Mon, Sep 14, 2015 at 11:36 AM, Philip Weaver > wrote: > >> Hello, I am trying to use dynamic allocation which requires the shuffle >> service. I am running Spark on mesos. >> >> Whenever

Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Philip Weaver
hat defeats the purpose. Does that make sense? Thanks in advance for any advice you can provide! - Philip On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver wrote: > I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR > scheduler, so I can define a long-running applic

Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Philip Weaver
(whoops, redundant sentence in that first paragraph) On Fri, Sep 18, 2015 at 8:36 AM, Philip Weaver wrote: > Here's a specific example of what I want to do. My Spark application is > running with total-executor-cores=8. A request comes in, it spawns a thread > to handle that reque

Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
I am processing a single file and want to remove duplicate rows by some key by always choosing the first row in the file for that key. The best solution I could come up with is to zip each row with the partition index and local index, like this: rdd.mapPartitionsWithIndex { case (partitionIndex,

Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
serially. > You would take the first and ignore the rest. Note that "first" > depends on your RDD having an ordering to begin with, or else you rely > on however it happens to be ordered after whatever operations give you > a key-value RDD. > > On Tue, Sep 22, 2015 at 1:

Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
ld yield > Some(row). After that, combining is a no-op for other rows. > > On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver > wrote: > > Hmm, I don't think that's what I want. There's no "zero value" in my use > > case. > > > > On Mon, Sep 21, 2

Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
E.md", 4) > > You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m > skimming through some tuples, hopefully this is clear enough. > > -adrian > > From: Philip Weaver > Date: Tuesday, September 22, 2015 at 3:26 AM > To: user > Subject: Remove dupli

Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
is the original line > number in the file, not the index in the partition. > > Sent from my iPhone > > On 22 Sep 2015, at 17:50, Philip Weaver wrote: > > Thanks. If textFile can be used in a way that preserves order, than both > the partition index and the index within each pa

Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
the index is the original line > number > > in the file, not the index in the partition. > > > > Sent from my iPhone > > > > On 22 Sep 2015, at 17:50, Philip Weaver wrote: > > > > Thanks. If textFile can be used in a way that preserves order, than

Re: Remove duplicate keys by always choosing first in file.

2015-09-24 Thread Philip Weaver
Oops, I didn't catch the suggestion to just use RDD.zipWithIndex, which I forgot existed (and I've discoverd I actually used in another project!). I will use that instead of the mapPartitionsWithIndex/zipWithIndex solution that I posted originally. On Tue, Sep 22, 2015 at 9:07 AM, Phi