Re: Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
row_number().over(windowSpec).as("row_number")) .filter("row_number == 1") .select($"group_id", $"row_id".as("last_row_id"), $"total") Would love to know if there's a better way! On Mon, Aug 28, 2017 at 9:19

Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
Hi, I'm struggling a little with some unintuitive behavior with the Scala API. (Spark 2.0.2) I wrote something like df.orderBy("a", "b") .groupBy("group_id") .agg(sum("col_to_sum").as("total"), last("row_id").as("last_row_id"))) and expected a result with a unique group_id column, a

Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-10 Thread Everett Anderson
er table. This happened when we unioned many Spark DataFrames together without doing a repartition or coalesce afterwards. After throwing in a repartition (to additionally balance the output shards) we haven't seen the error, again, but our graphs of S3 HEAD requests are still rather alarming

Spark, S3A, and 503 SlowDown / rate limit issues

2017-06-29 Thread Everett Anderson
Hi, We're using Spark 2.0.2 + Hadoop 2.7.3 on AWS EMR with S3A for direct I/O from/to S3 from our Spark jobs. We set mapreduce.fileoutputcommitter.algorithm.version=2 and are using encrypted S3 buckets. This has been working fine for us, but perhaps as we've been running more jobs in parallel, we

Re: Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-30 Thread Everett Anderson
github.com/apache/spark/blob/branch-2.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L184> . So this is one option, though certainly abusing the staging directory. A more general one might be to find where Dataset.persist(DISK_ONLY) writes. On Fri, May 26, 2017 at 9:08 AM, Everet

Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-26 Thread Everett Anderson
Hi, I need to set a checkpoint directory as I'm starting to use GraphFrames. (Also, occasionally my regular DataFrame lineages get too long so it'd be nice to use checkpointing to squash the lineage.) I don't actually need this checkpointed data to live beyond the life of the job, however. I'm ru

Re: Driver spins hours in query plan optimization

2017-05-02 Thread Everett Anderson
Seems like https://issues.apache.org/jira/browse/SPARK-13346 is likely the same issue. Seems like for some people persist() doesn't work and they have to convert to RDDs and back. On Fri, Apr 14, 2017 at 1:39 PM, Everett Anderson wrote: > Hi, > > We keep hitting a situation

Re: Calculate mode separately for multiple columns in row

2017-05-01 Thread Everett Anderson
feature) HAVING rank = 1) AS feature_mode ON color_mode.creature = feature_mode.creature) On Thu, Apr 27, 2017 at 9:40 AM, Everett Anderson wrote: > For the curious, I played around with a UDAF for this (shown below). On > the downside, it assembles a Ma

Re: Calculate mode separately for multiple columns in row

2017-04-27 Thread Everett Anderson
ll } frequencies.maxBy(_._2)._1 } } On Wed, Apr 26, 2017 at 10:21 AM, Everett Anderson wrote: > Hi, > > One common situation I run across is that I want to compact my data and > select the mode (most frequent value) in several columns for each group. > > Even calculating

Calculate mode separately for multiple columns in row

2017-04-26 Thread Everett Anderson
Hi, One common situation I run across is that I want to compact my data and select the mode (most frequent value) in several columns for each group. Even calculating mode for one column in SQL is a bit tricky. The ways I've seen usually involve a nested sub-select with a group by + count and then

Driver spins hours in query plan optimization

2017-04-14 Thread Everett Anderson
Hi, We keep hitting a situation on Spark 2.0.2 (haven't tested later versions, yet) where the driver spins forever seemingly in query plan optimization for moderate queries, such as the union of a few (~5) other DataFrames. We can see the driver spinning with one core in the nioEventLoopGroup-2-2

Re: Assigning a unique row ID

2017-04-10 Thread Everett Anderson
Indeed, I tried persist with MEMORY_AND_DISK and it works! (I'm wary of MEMORY_ONLY for this as it could potentially recompute shards if it couldn't entirely cache in memory.) Thanks for the help, everybody!! On Sat, Apr 8, 2017 at 11:54 AM, Everett Anderson wrote: > > > On

Re: Assigning a unique row ID

2017-04-08 Thread Everett Anderson
go. > > Thanks, > Subhash > > Sent from my iPhone > > On Apr 7, 2017, at 7:32 PM, Everett Anderson > wrote: > > Hi, > > Thanks, but that's using a random UUID. Certainly unlikely to have > collisions, but not guaranteed. > > I'd rather prefer somet

Re: Assigning a unique row ID

2017-04-07 Thread Everett Anderson
Fri, Apr 7, 2017 at 4:24 PM, Tim Smith wrote: > http://stackoverflow.com/questions/37231616/add-a-new- > column-to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator > > > On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson > wrote: > >> Hi, >> >> What&#x

Assigning a unique row ID

2017-04-07 Thread Everett Anderson
Hi, What's the best way to assign a truly unique row ID (rather than a hash) to a DataFrame/Dataset? I originally thought that functions.monotonically_increasing_id would do this, but it seems to have a rather unfortunate property that if you add it as a column to table A and then derive tables X

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-20 Thread Everett Anderson
PM, Everett Anderson wrote: > Hi! > > On Thu, Mar 16, 2017 at 5:20 PM, Burak Yavuz wrote: > >> Hi Everett, >> >> IIRC we added unionAll in Spark 2.0 which is the same implementation as >> rdd union. The union in DataFrames with Spark 2.0 does dedeuplication, an

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
pache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#union(org.apache.spark.sql.Dataset) and https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#unionAll(org.apache.spark.sql.Dataset) > > Best, > Burak > > On Thu, Mar 16, 2017 at 4:14 P

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
16, 2017 at 2:55 PM, Everett Anderson wrote: > Hi, > > We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of > tables together and save as Parquet to S3, but it seems to take a long > time. We're using the S3A FileSystem implementation under the cov

Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
Hi, We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of tables together and save as Parquet to S3, but it seems to take a long time. We're using the S3A FileSystem implementation under the covers, too, if that helps. Watching the Spark UI, the executors all eventually stop (we're

Best way to assign a unique IDs to row groups

2017-03-01 Thread Everett Anderson
Hi, I've used functions.monotonically_increasing_id() for assigning a unique ID to all rows, but I'd like to assign a unique ID to each group of rows with the same key. The two ways I can think of to do this are Option 1: Create separate group ID table and join back - Create a new data frame

Re: Strange behavior with 'not' and filter pushdown

2017-02-14 Thread Everett Anderson
Wrapping this up -- fix is in 2.1.0 and has been backported to the 2.0.x branch, as well. On Mon, Feb 13, 2017 at 6:41 PM, Everett Anderson wrote: > Went ahead and opened > > https://issues.apache.org/jira/browse/SPARK-19586 > > though I'd generally expect to just close it a

Re: Strange behavior with 'not' and filter pushdown

2017-02-13 Thread Everett Anderson
Went ahead and opened https://issues.apache.org/jira/browse/SPARK-19586 though I'd generally expect to just close it as fixed in 2.1.0 and roll on. On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson wrote: > On the plus side, looks like this may be fixed in 2.1.0: > > ==

Re: Strange behavior with 'not' and filter pushdown

2017-02-11 Thread Everett Anderson
) +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson wrote: > Bumping this thread. > > Tr

Re: Strange behavior with 'not' and filter pushdown

2017-02-10 Thread Everett Anderson
Bumping this thread. Translating "where not(username is not null)" into a filter of [IsNotNull(username), Not(IsNotNull(username))] seems like a rather severe bug. Spark 1.6.2: explain select count(*) from parquet_table where not( username is not null) == Physical Plan == TungstenAggregate(key=

Re: Un-exploding / denormalizing Spark SQL help

2017-02-08 Thread Everett Anderson
the great suggestions!) > > On Thu, 9 Feb 2017 at 4:01 am, Xiaomeng Wan wrote: > >> You could also try pivot. >> >> On 7 February 2017 at 16:13, Everett Anderson >> wrote: >> >> >> >> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbr

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
ime. I may have seen groupBy + join be better than >> window (there were more exchanges in play for windows I reckon). >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mast

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
---++--+--+-+--+--+-+--+--+-+ | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3| data3|priority3| +---++--+--+-+--+--+-+--+--+-+ | 1|Fred| 8|value1|1| 8|value8|

Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
Hi, I'm trying to un-explode or denormalize a table like +---++-+--++ |id |name|extra|data |priority| +---++-+--++ |1 |Fred|8|value1|1 | |1 |Fred|8|value8|2 | |1 |Fred|8|value5|3 | |2 |Amy |9|value3|1 | |2 |Amy

Re: Running Spark on EMR

2017-01-16 Thread Everett Anderson
On Sun, Jan 15, 2017 at 11:09 AM, Andrew Holway < andrew.hol...@otternetworks.de> wrote: > use yarn :) > > "spark-submit --master yarn" > Doesn't this require first copying out various Hadoop configuration XML files from the EMR master node to the machine running the spark-submit? Or is there a w

Re: Slow metastore list tables in Spark 2.x

2017-01-04 Thread Everett Anderson
;") as it only returns table name and isTemporary. On Wed, Jan 4, 2017 at 11:26 AM, Everett Anderson wrote: > Hi, > > In Spark 1.6.2, we were able to very quickly -- nearly instantly -- search > through the list of (many) table names in our Hive metastore with > > sqlConte

Slow metastore list tables in Spark 2.x

2017-01-04 Thread Everett Anderson
Hi, In Spark 1.6.2, we were able to very quickly -- nearly instantly -- search through the list of (many) table names in our Hive metastore with sqlContext.tableNames().filter(_.matches("some regex")).foreach { println } In Spark 2.0.2, however, this takes forever. Similarly, queries with Catalo

Re: Writing DataFrame filter results to separate files

2016-12-06 Thread Everett Anderson
On Mon, Dec 5, 2016 at 5:33 PM, Michael Armbrust wrote: > 1. In my case, I'd need to first explode my data by ~12x to assign each >> record to multiple 12-month rolling output windows. I'm not sure Spark SQL >> would be able to optimize this away, combining it with the output writing >> to do it

Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Everett Anderson
n my case -- be shuffled to a single machine and then written together as one output shard? For a large amount of data per window, that seems less than ideal. > > On Mon, Dec 5, 2016 at 10:59 AM, Everett Anderson < > ever...@nuna.com.invalid> wrote: > >> Hi, >> &g

Writing DataFrame filter results to separate files

2016-12-05 Thread Everett Anderson
Hi, I have a DataFrame of records with dates, and I'd like to write all 12-month (with overlap) windows to separate outputs. Currently, I have a loop equivalent to: for ((windowStart, windowEnd) <- windows) { val windowData = allData.filter( getFilterCriteria(windowStart, windowEnd))

Modifying Metadata in StructType schemas

2016-10-24 Thread Everett Anderson
Hi, I've been using the immutable Metadata within the StructType of a DataFrame/Dataset to track application-level column lineage. However, since it's immutable, the only way to modify it is to do a full trip of 1. Convert DataFrame/Dataset to Row RDD 2. Create new, modified Metadata per c

Equivalent to --files for driver?

2016-09-21 Thread Everett Anderson
Hi, I'm running Spark 1.6.2 on YARN and I often use the cluster deploy mode with spark-submit. While the --files param is useful for getting files onto the cluster in the working directories of the executors, the driver's working directory doesn't get them. Is there some equivalent to --files for

Re: S3A + EMR failure when writing Parquet?

2016-09-04 Thread Everett Anderson
.s3a.fast.upload=true. Is that right? On Tue, Aug 30, 2016 at 11:49 AM, Steve Loughran wrote: > > On 29 Aug 2016, at 18:18, Everett Anderson > wrote: > > Okay, I don't think it's really just S3A issue, anymore. I can run the job > using fs.s3.impl/spark.hadoop.fs.s3.

Does Spark on YARN inherit or replace the Hadoop/YARN configs?

2016-08-30 Thread Everett Anderson
Hi, I've had a bit of trouble getting Spark on YARN to work. When executing in this mode and submitting from outside the cluster, one must set HADOOP_CONF_DIR or YARN_CONF_DIR , from which spark-submit can find the params it needs to locat

Re: S3A + EMR failure when writing Parquet?

2016-08-29 Thread Everett Anderson
em to the cluster since it won't have the EMRFS implementation locally. On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson wrote: > (Sorry, typo -- I was using spark.hadoop.mapreduce.f > ileoutputcommitter.algorithm.version=2 not 'hadooop', of course) > > On Sun, Aug 28, 20

Re: S3A + EMR failure when writing Parquet?

2016-08-28 Thread Everett Anderson
(Sorry, typo -- I was using spark.hadoop.mapreduce. fileoutputcommitter.algorithm.version=2 not 'hadooop', of course) On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson wrote: > Hi, > > I'm having some trouble figuring out a failure when using S3A when writing > a

S3A + EMR failure when writing Parquet?

2016-08-28 Thread Everett Anderson
Hi, I'm having some trouble figuring out a failure when using S3A when writing a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and Spark 1.6.2). It works when using EMRFS (s3://), though. I'm using these extra conf params, though I've also tried without everything but the encryption on

Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-21 Thread Everett Anderson
> > -- > Bedrytski Aliaksandr > sp...@bedryt.ski > > > > On Sat, Aug 20, 2016, at 01:25, Everett Anderson wrote: > Hi! > > Just following up on this -- > > When people talk about a shared session/context for testing like this, > I assume it's st

Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-19 Thread Everett Anderson
try and > include this in the next release :) > > On Mon, Aug 1, 2016 at 9:22 AM, Koert Kuipers wrote: > >> we share a single single sparksession across tests, and they can run in >> parallel. is pretty fast >> >> On Mon, Aug 1, 2016 at 12:02 PM, Everett Anderson

Submitting jobs to YARN from outside EMR -- config & S3 impl

2016-08-15 Thread Everett Anderson
Hi, We're currently using an EMR cluster (which uses YARN) but submitting Spark jobs to it using spark-submit from different machines outside the cluster. We haven't had time to investigate using something like Livy , yet. We also have a need to use a mix of clus

Re: Java and SparkSession

2016-08-05 Thread Everett Anderson
Hi, Can you say more about what goes wrong? I was migrating my code and began using this for initialization: SparkConf sparkConf = new SparkConf().setAppName(...) SparkSession sparkSession = new SparkSession.Builder().config(sparkConf).getOrCreate(); JavaSparkContext jsc = new JavaSpark

Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-01 Thread Everett Anderson
Hi, Right now, if any code uses DataFrame/Dataset, I need a test setup that brings up a local master as in this article . That's a lot of overhead for unit testing and the tests can't run in parallel

Re: Role-based S3 access outside of EMR

2016-07-28 Thread Everett Anderson
fs.s3a.S3AFileSystem > fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A > fs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A > > And make sure the s3a jars are in your classpath > > Thanks, > Ewan > > *From:* Everett Anderson [mailto:ever...@nuna.com.INV

Re: Programmatic use of UDFs from Java

2016-07-22 Thread Everett Anderson
> Everett, I had the same question today and came across this old thread. > Not sure if there has been any more recent work to support this. > http://apache-spark-developers-list.1001551.n3.nabble.com/Using-UDFs-in-Java-without-registration-td12497.html > > > On Thu, Jul 2

Re: Creating a DataFrame from scratch

2016-07-22 Thread Everett Anderson
appen if you gave it Integer.class, but I suspect it still won't work because Integer may not have the bean-style getters. On Fri, Jul 22, 2016 at 9:37 AM, Everett Anderson wrote: > Hey, > > I think what's happening is that you're calling this createDataFrame > method >

Re: Creating a DataFrame from scratch

2016-07-22 Thread Everett Anderson
Hey, I think what's happening is that you're calling this createDataFrame method : createDataFrame(java.util.List data, java.lang.Class beanClass) which expects

Programmatic use of UDFs from Java

2016-07-21 Thread Everett Anderson
Hi, In the Java Spark DataFrames API, you can create a UDF, register it, and then access it by string name by using the convenience UDF classes in org.apache.spark.sql.api.java . Example UDF1 testU

Re: Role-based S3 access outside of EMR

2016-07-21 Thread Everett Anderson
es >> >> as mentioned above, using EMRFS libs solved this problem: >> >> http://docs.aws.amazon.com//ElasticMapReduce/latest/ReleaseGuide/emr-fs.html >> >> >> 2016-07-21 8:37 GMT+02:00 Gourav Sengupta : >> > But that would mean you would be accessing

Re: Role-based S3 access outside of EMR

2016-07-20 Thread Everett Anderson
copy the data from the > data center back to my local env. > > Andy > > From: Everett Anderson > Date: Tuesday, July 19, 2016 at 2:30 PM > To: "user @spark" > Subject: Role-based S3 access outside of EMR > > Hi, > > When running on EMR, AWS configu

Role-based S3 access outside of EMR

2016-07-19 Thread Everett Anderson
Hi, When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop FileSystem implementation for s3:// URLs and seems to install the necessary S3 credentials properties, as well. Often, it's nice during development to run outside of a cluster even with the "local" Spark master, though, whic

Re: Best practice for handing tables between pipeline components

2016-06-28 Thread Everett Anderson
would help to share cached objects >> >> On Mon, Jun 27, 2016 at 11:14 AM Everett Anderson >> wrote: >> >>> Hi, >>> >>> We have a pipeline of components strung together via Airflow running on >>> AWS. Some of them are implemented in

Best practice for handing tables between pipeline components

2016-06-27 Thread Everett Anderson
Hi, We have a pipeline of components strung together via Airflow running on AWS. Some of them are implemented in Spark, but some aren't. Generally they can all talk to a JDBC/ODBC end point or read/write files from S3. Ideally, we wouldn't suffer the I/O cost of writing all the data to HDFS or S3

Re: Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
; > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 17 June 20

Re: Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
/www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 17 June 2016 at 20:38, Everett Anderson > wrote: > >> Hi, >> >> I have a system with files in a variety of non-standard input

Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
Hi, I have a system with files in a variety of non-standard input formats, though they're generally flat text files. I'd like to dynamically create DataFrames of string columns. What's the best way to go from a RDD to a DataFrame of StringType columns? My current plan is - Call map() on the

Re: StackOverflowError even with JavaSparkContext union(JavaRDD... rdds)

2016-06-05 Thread Everett Anderson
have 80M for each thread (very simply there might be > 100 of them), but this is just a workaround. This is configuration that I > use to train random forest with input of 400k samples. > > Hope this helps. > > -- > Be well! > Jean Morozov > > On Sun, Jun 5, 2016 at 11:17

StackOverflowError even with JavaSparkContext union(JavaRDD... rdds)

2016-06-05 Thread Everett Anderson
Hi! I have a fairly simple Spark (1.6.1) Java RDD-based program that's scanning through lines of about 1000 large text files of records and computing some metrics about each line (record type, line length, etc). Most are identical so I'm calling distinct(). In the loop over the list of files, I'm