Re: zip for pyspark

2016-08-08 Thread Ewan Leith
If you build a normal python egg file with the dependencies, you can execute that like you are executing a .py file with --py-files Thanks, Ewan On 8 Aug 2016 3:44 p.m., pseudo oduesp wrote: hi, how i can export all project on pyspark like zip from local session to cluster and deploy with s

Re: Spark 2.0.0 - Apply schema on few columns of dataset

2016-08-07 Thread Ewan Leith
Looking at the encoders api documentation at http://spark.apache.org/docs/latest/api/java/ == Java == Encoders are specified by calling static methods on Encoders. List data = Arrays.asList("abc", "abc", "xyz"); Da

RE: how to save spark files as parquets efficiently

2016-07-29 Thread Ewan Leith
If you replace the df.write …. With df.count() in your code you’ll see how much time is taken to process the full execution plan without the write output. That code below looks perfectly normal for writing a parquet file yes, there shouldn’t be any tuning needed for “normal” performance. Tha

RE: Role-based S3 access outside of EMR

2016-07-21 Thread Ewan Leith
If you use S3A rather than S3N, it supports IAM roles. I think you can make s3a used for s3:// style URLs so it’s consistent with your EMR paths by adding this to your Hadoop config, probably in core-site.xml: fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3.impl=org.apache.hadoop.fs.s3a

Re: can I use ExectorService in my driver? was: is dataframe.write() async? Streaming performance problem

2016-07-08 Thread Ewan Leith
so the slaves do not incur any extra overhead. Thanks Andy From: Ewan Leith mailto:ewan.le...@realitymine.com>> Date: Friday, July 8, 2016 at 8:52 AM To: Cody Koeninger mailto:c...@koeninger.org>>, Andrew Davidson mailto:a...@santacruzintegration.com>> Cc: "u

RE: is dataframe.write() async? Streaming performance problem

2016-07-08 Thread Ewan Leith
Writing (or reading) small files from spark to s3 can be seriously slow. You'll get much higher throughput by doing a df.foreachPartition(partition => ...) and inside each partition, creating an aws s3 client then doing a partition.foreach and uploading the files using that s3 client with its ow

Re: Spark SQL Nested Array of JSON with empty field

2016-06-04 Thread Ewan Leith
The spark json read is unforgiving of things like missing elements from some json records, or mixed types. If you want to pass invalid json files through spark you're best doing an initial parse through the Jackson APIs using a defined schema first, then you can set types like Option[String] wh

RE: Timed aggregation in Spark

2016-05-23 Thread Ewan Leith
Rather than open a connection per record, if you do a DStream foreachRDD at the end of a 5 minute batch window http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams then you can do a rdd.foreachPartition to get the RDD partitions. Open a connection t

Spark Streaming - Exception thrown while writing record: BlockAdditionEvent

2016-05-23 Thread Ewan Leith
As we increase the throughput on our Spark streaming application, we're finding we hit errors with the WriteAheadLog, with errors like this: 16/05/21 20:42:21 WARN scheduler.ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(10),None,WriteAh

RE: Spark 1.6.0: substring on df.select

2016-05-12 Thread Ewan Leith
You could use a UDF pretty easily, something like this should work, the lastElement function could be changed to do pretty much any string manipulation you want. import org.apache.spark.sql.functions.udf def lastElement(input: String) = input.split("/").last val lastElementUdf = udf(lastElemen

RE: Parse Json in Spark

2016-05-09 Thread Ewan Leith
The simplest way is probably to use the sc.binaryFiles or sc.wholeTextFiles API to create an RDD containing the JSON files (maybe need a sc.wholeTextFiles(…).map(x => x._2) to drop off the filename column) then do a sqlContext.read.json(rddName) That way, you don’t need to worry about combining

RE: Spark streaming - update configuration while retaining write ahead log data?

2016-03-15 Thread Ewan Leith
That’s what I thought, it’s a shame! Thanks Saisai, Ewan From: Saisai Shao [mailto:sai.sai.s...@gmail.com] Sent: 15 March 2016 09:22 To: Ewan Leith Cc: user Subject: Re: Spark streaming - update configuration while retaining write ahead log data? Currently configuration is a part of

Spark streaming - update configuration while retaining write ahead log data?

2016-03-15 Thread Ewan Leith
Has anyone seen a way of updating the Spark streaming job configuration while retaining the existing data in the write ahead log? e.g. if you've launched a job without enough executors and a backlog has built up in the WAL, can you increase the number of executors without losing the WAL data?

Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
run into this issue a few times. Can you create a JIRA ticket so we can track it? Would be even better if you are interested in working on a patch! Thanks. On Wed, Mar 2, 2016 at 11:51 AM, Ewan Leith mailto:ewan.le...@realitymine.com>> wrote: Hi Reynold, yes that would be perfect for

Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
ulls for fields that doesn't exist or have incompatible schema? On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith mailto:ewan.le...@realitymine.com>> wrote: Thanks Michael, it's not a great example really, as the data I'm working with has some source files that do fit the

Re: SFTP Compressed CSV into Dataframe

2016-03-02 Thread Ewan Leith
The Apache Commons library will let you access files on an SFTP server via a Java library, no local file handling involved https://commons.apache.org/proper/commons-vfs/filesystems.html Hope this helps, Ewan I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV file? I am a

Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Ewan Leith
ot;) On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith mailto:ewan.le...@realitymine.com>> wrote: When you create a dataframe using the sqlContext.read.schema() API, if you pass in a schema that's compatible with some of the records, but incompatible with others, it seems you can't do a .

RE: Write to S3 with server side encryption in KMS mode

2016-01-26 Thread Ewan Leith
Hi Nisrina, I’m not aware of any support for KMS keys in s3n, s3a or the EMR specific EMRFS s3 driver. If you’re using EMRFS with Amazon’s EMR, you can use KMS keys with client-side encryption http://docs.aws.amazon.com/kms/latest/developerguide/services-emr.html#emrfs-encrypt If this has chan

RE: how to correctly run scala script using spark-shell through stdin (spark v1.0.0)

2016-01-26 Thread Ewan Leith
I’ve just tried running this using a normal stdin redirect: ~/spark/bin/spark-shell < simple.scala Which worked, it started spark-shell, executed the script, the stopped the shell. Thanks, Ewan From: Iulian Dragoș [mailto:iulian.dra...@typesafe.com] Sent: 26 January 2016 15:00 To: fernandrez19

RE: Out of memory issue

2016-01-06 Thread Ewan Leith
Hi Muthu, this could be related to a known issue in the release notes http://spark.apache.org/releases/spark-release-1-6-0.html Known issues SPARK-12546 - Save DataFrame/table as Parquet with dynamic partitions may cause OOM; this can be worked around by decreasing the memory used by both

RE: How to accelerate reading json file?

2016-01-06 Thread Ewan Leith
If you already know the schema, then you can run the read with the schema parameter like this: val path = "examples/src/main/resources/jsonfile" val jsonSchema = StructType( StructField("id",StringType,true) :: StructField("reference",LongType,true) :: StructField("deta

RE: Batch together RDDs for Streaming output, without delaying execution of map or transform functions

2015-12-31 Thread Ewan Leith
good idea too, thanks. Thanks, Ewan From: Ashic Mahtab [mailto:as...@live.com] Sent: 31 December 2015 13:50 To: Ewan Leith ; Apache Spark Subject: RE: Batch together RDDs for Streaming output, without delaying execution of map or transform functions Hi Ewan, Transforms are definitions of what n

Batch together RDDs for Streaming output, without delaying execution of map or transform functions

2015-12-31 Thread Ewan Leith
Hi all, I'm sure this must have been solved already, but I can't see anything obvious. Using Spark Streaming, I'm trying to execute a transform function on a DStream at short batch intervals (e.g. 1 second), but only write the resulting data to disk using saveAsTextFiles in a larger batch after

RE: 回复: has any spark write orc document

2015-11-20 Thread Ewan Leith
Looking in the code https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala I don’t think any of that advanced functionality is supported sorry �C there is a parameters option, but I don’t think it’s used for much. Ewan From: zhangjp

RE: Size exceeds Integer.MAX_VALUE on EMR 4.0.0 Spark 1.4.1

2015-11-16 Thread Ewan Leith
How big do you expect the file to be? Spark has issues with single blocks over 2GB (see https://issues.apache.org/jira/browse/SPARK-1476 and https://issues.apache.org/jira/browse/SPARK-6235 for example) If you don’t know, try running df.repartition(100).write.format… to get an idea of how big

RE: Spark Streaming - use the data in different jobs

2015-10-19 Thread Ewan Leith
Storing the data in HBase, Cassandra, or similar is possibly the right answer, the other option that can work well is re-publishing the data back into second queue on RabbitMQ, to be read again by the next job. Thanks, Ewan From: Oded Maimon [mailto:o...@scene53.com] Sent: 18 October 2015 12:49

RE: Should I convert json into parquet?

2015-10-19 Thread Ewan Leith
As Jörn says, Parquet and ORC will get you really good compression and can be much faster. There also some nice additions around predicate pushdown which can be great if you've got wide tables. Parquet is obviously easier to use, since it's bundled into Spark. Using ORC is described here http:

RE: Need for advice - performance improvement and out of memory resolution

2015-09-30 Thread Ewan Leith
Try reducing the number of workers to 2, and increasing their memory up to 6GB. However I've seen mention of a bug in the pyspark API for when calling head() on a dataframe in spark 1.5.0 and 1.4, it's got a big performance hit. https://issues.apache.org/jira/browse/SPARK-10731 It's fixed in sp

RE: Converting a DStream to schemaRDD

2015-09-29 Thread Ewan Leith
Something like: dstream.foreachRDD { rdd => val df = sqlContext.read.json(rdd) df.select(…) } https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams Might be the place to start, it’ll convert each batch of dstream into an RDD then let you work

SQLContext.read().json() inferred schema - force type to strings?

2015-09-25 Thread Ewan Leith
Hi all, We're uising SQLContext.read.json to read in a stream of JSON datasets, but sometimes the inferred schema contains for the same value a LongType, and sometimes a DoubleType. This obviously causes problems with merging the schema, so does anyone know a way of forcing the inferred schema

Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi

2015-09-19 Thread Ewan Leith
yarn-client still runs the executor tasks on the cluster, the main difference is where the driver job runs. Thanks, Ewan -- Original message-- From: shahab Date: Fri, 18 Sep 2015 13:11 To: Aniket Bhatnagar; Cc: user@spark.apache.org; Subject:Re: Zeppelin on Yarn : org.apache.spar

Re: [Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

2015-09-09 Thread Ewan Leith
The last time I checked, if you launch EMR 4 with only Spark selected as an application, HDFS isn't correctly installed. Did you select another application like Hive at launch time as well as Spark? If not, try that. Thanks, Ewan -- Original message-- From: Dean Wampler Date: Wed

RE: NOT IN in Spark SQL

2015-09-04 Thread Ewan Leith
Spark SQL doesn’t support “NOT IN”, but I think HiveQL does, so give using the HiveContext a try rather than SQLContext. Here’s the spark 1.2 docs on it, but it’s basically identical to running the SQLContext https://spark.apache.org/docs/1.2.0/sql-programming-guide.html#tab_scala_6 https://spar

Re: Problem while loading saved data

2015-09-03 Thread Ewan Leith
>From that, I'd guesd that HDFS isn't setup between the nodes, or for some >reason writes are defaulting to file:///path/ rather than hdfs:///path/ -- Original message-- From: Amila De Silva Date: Thu, 3 Sep 2015 17:12 To: Ewan Leith; Cc: user@spark.apache.

RE: How to Take the whole file as a partition

2015-09-03 Thread Ewan Leith
Have a look at the sparkContext.binaryFiles, it works like wholeTextFiles but returns a PortableDataStream per file. It might be a workable solution though you'll need to handle the binary to UTF-8 or equivalent conversion Thanks, Ewan From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: 03 S

spark-csv package - output to filename.csv?

2015-09-03 Thread Ewan Leith
Using the spark-csv package or outputting to text files, you end up with files named: test.csv/part-00 rather than a more user-friendly "test.csv", even if there's only 1 part file. We can merge the files using the Hadoop merge command with something like this code from http://deploymentzone.c

RE: spark 1.4.1 saveAsTextFile (and Parquet) is slow on emr-4.0.0

2015-09-03 Thread Ewan Leith
For those who have similar issues on EMR writing Parquet files, if you update mapred-site.xml with the following lines: mapred.output.direct.EmrFileSystemtrue mapred.output.direct.NativeS3FileSystemtrue parquet.enable.summary-metadatafalse spark.sql.parquet.output.committer.classorg.apache.spark.

RE: Problem while loading saved data

2015-09-03 Thread Ewan Leith
Your error log shows you attempting to read from 'people.parquet2' not ‘people.parquet’ as you’ve put below, is that just from a different attempt? Otherwise, it’s an odd one! There aren’t _SUCCESS, _common_metadata and _metadata files under people.parquet that you’ve listed below, which would

RE: correct use of DStream foreachRDD

2015-08-28 Thread Ewan Leith
I think what you’ll want is to carry out the .map functions before the foreachRDD, something like: val lines = ssc.textFileStream("/stream").map(Sensor.parseSensor).map(Sensor.convertToPut) lines.foreachRDD { rdd => // parse the line of data into sensor object rdd.saveAsHadoo

RE: How to increase the Json parsing speed

2015-08-28 Thread Ewan Leith
Can you post roughly what you’re running as your Spark code? One issue I’ve seen before is that passing a directory full of files as a path “/path/to/files/” can be slow, while “/path/to/files/*” runs fast. Also, if you’ve not seen it, have a look at the binaryFiles call http://spark.apache.org

RE: Driver running out of memory - caused by many tasks?

2015-08-27 Thread Ewan Leith
Are you using the Kryo serializer? If not, have a look at it, it can save a lot of memory during shuffles https://spark.apache.org/docs/latest/tuning.html I did a similar task and had various issues with the volume of data being parsed in one go, but that helped a lot. It looks like the main di

RE: Selecting different levels of nested data records during one select?

2015-08-27 Thread Ewan Leith
I've just come across https://forums.databricks.com/questions/893/how-do-i-explode-a-dataframe-column-containing-a-c.html Which appears to get us started using explode on nested datasets as arrays correctly, thanks. Ewan From: Ewan Leith [mailto:ewan.le...@realitymine.com] Sent: 27 A

Selecting different levels of nested data records during one select?

2015-08-27 Thread Ewan Leith
Hello, I'm trying to query a nested data record of the form: root |-- userid: string (nullable = true) |-- datarecords: array (nullable = true) ||-- element: struct (containsNull = true) |||-- name: string (nullable = true) |||-- system: boolean (nullable = true) |||--

RE: Create column in nested structure?

2015-08-13 Thread Ewan Leith
Never mind me, I've found an email to this list from Raghavendra Pandey which got me what I needed val nestedCol = struct(df("nested2.column1"), df("nested2.column2"), df("flatcolumn")) val df2 = df.select(df("nested1"), nestedCol as "nested2

Create column in nested structure?

2015-08-13 Thread Ewan Leith
Has anyone used withColumn (or another method) to add a column to an existing nested dataframe? If I call: df.withColumn("nested.newcolumn", df("oldcolumn")) then it just creates the new column with a "." In it's name, not under the "nested" structure. Thanks, Ewan

Parquet file organisation for 100GB+ dataframes

2015-08-12 Thread Ewan Leith
Hi all, Can anyone share their experiences working with storing and organising larger datasets with Spark? I've got a dataframe stored in Parquet on Amazon S3 (using EMRFS) which has a fairly complex nested schema (based on JSON files), which I can query in Spark, but the initial setup takes a

RE: Specifying the role when launching an AWS spark cluster using spark_ec2

2015-08-07 Thread Ewan Leith
You'll have a lot less hassle using the AWS EMR instances with Spark 1.4.1 for now, until the spark_ec2.py scripts move to Hadoop 2.7.1, at the moment I'm pretty sure it's only using Hadoop 2.4 The EMR setup with Spark lets you use s3:// URIs with IAM roles Ewan -Original Message- From

RE: Help accessing protected S3

2015-07-23 Thread Ewan Leith
I think the standard S3 driver used in Spark from the Hadoop project (S3n) doesn't support IAM role based authentication. However, S3a should support it. If you're running Hadoop 2.6 via the spark-ec2 scripts (I'm not sure what it launches with by default) try accessing your bucket via s3a:// U

RE: coalesce on dataFrame

2015-07-01 Thread Ewan Leith
It's in spark 1.4.0, or should be at least: https://issues.apache.org/jira/browse/SPARK-6972 Ewan -Original Message- From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] Sent: 01 July 2015 08:23 To: user@spark.apache.org Subject: coalesce on dataFrame How can we use coalesce(1, true)

RE: spark timesout maybe due to binaryFiles() with more than 1 million files in HDFS

2015-06-08 Thread Ewan Leith
Can you do a simple sc.binaryFiles("hdfs:///path/to/files/*").count() in the spark-shell and verify that part works? Ewan -Original Message- From: Konstantinos Kougios [mailto:kostas.koug...@googlemail.com] Sent: 08 June 2015 15:40 To: Ewan Leith; user@spark.apache.org S

RE: spark timesout maybe due to binaryFiles() with more than 1 million files in HDFS

2015-06-08 Thread Ewan Leith
Try putting a * on the end of xmlDir, i.e. xmlDir = fdfs:///abc/def/* Rather than xmlDir = Hdfs://abc/def and see what happens. I don't know why, but that appears to be more reliable for me with S3 as the filesystem. I'm also using binaryFiles, but I've tried running the same command while w

RE: redshift spark

2015-06-05 Thread Ewan Leith
That project is for reading data in from Redshift table exports stored in s3 by running commands in redshift like this: unload ('select * from venue') to 's3://mybucket/tickit/unload/' http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html The path in the parameters below is t

Slow file listing when loading records from in S3 without filename or wildcard

2015-06-05 Thread Ewan Leith
Hi all, I'm not sure if this is a Spark issue, or an AWS/Hadoop/S3 driver issue, but I've noticed that I get a very slow response when I run: val files = sc.wholeTextFiles("s3://emr-test-dgp/testfiles/").count() (which will count all the files in the directory) But an almost immediate response

RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Thanks Cheng, that's brilliant, you've saved me a headache. Ewan From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: 19 May 2015 11:58 To: Ewan Leith; user@spark.apache.org Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces? Th

RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Cheng Lian [mailto:lian.cs@gmail.com] Sent: 19 May 2015 11:01 To: Ewan Leith; user@spark.apache.org Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces? Hi Ewan, Different from AvroParquetWriter, in Spark SQL we uses StructType as the interme

AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Hi all, I might be missing something, but does the new Spark 1.3 sqlContext save interface support using Avro as the schema structure when writing Parquet files, in a similar way to AvroParquetWriter (which I've got working)? I've seen how you can load an avro file and save it as parquet from