Re: [External] Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-04 Thread Ben Teeuwen
s >>>>>>>>>> generally useful: https://issues.apache. >>>>>>>>>> org/jira/browse/SPARK-19031 >>>>>>>>>> >>>>>>>>>> In the mean time you could try implementing your own Source

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-22 Thread Ben Teeuwen
2:33 PM, Davies Liu wrote: >> You are using lots of tiny executors (128 executor with only 2G >> memory), could you try with bigger executor (for example 16G x 16)? >> >> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen wrote: >>> >>> So I wrote some co

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Ben Teeuwen
d to see how it compares. > > By the way, the feature size you select for the hasher should be a power of 2 > (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes are > evenly distributed (see the section on HashingTF under > http://spark.apache.org/docs/latest/ml

Re: Spark join and large temp files

2016-08-11 Thread Ben Teeuwen
. > On Aug 11, 2016, at 9:48 PM, Gourav Sengupta > wrote: > > Hi Ben, > > and that will take care of skewed data? > > Gourav > > On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <mailto:bteeu...@gmail.com>> wrote: > When you read both ‘a’ and ‘b', c

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-11 Thread Ben Teeuwen
> Why is that? Well once you've trained your model you have a (sparse) > N-dimensional weight vector that will be definition have 0s for unseen > indexes. At test time, any feature that only appears in your test set or new > data will be hashed to an index in the weight vector th

Re: Spark join and large temp files

2016-08-11 Thread Ben Teeuwen
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? If you .cache() and .count() to force a shuffle, it'll push the records that will be joined to the same executors. So; a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache() a.count() b = spark.read.pa

Re: registering udf to use in spark.sql('select...

2016-08-04 Thread Ben Teeuwen
, right? So I’m curious what to use instead. > On Aug 4, 2016, at 3:54 PM, Nicholas Chammas > wrote: > > Have you looked at pyspark.sql.functions.udf and the associated examples? > 2016년 8월 4일 (목) 오전 9:10, Ben Teeuwen <mailto:bteeu...@gmail.com>>님이 작성: > Hi, >

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Ben Teeuwen
t; > In [23]: hashed = hasher.transform(df) > > In [24]: hashed.show() > +---+---+-+-+ > | id|feature| features| features_vector| > +---+---+-+-+ > | 0|foo|[feature=foo]| (256,[59],[1.0])| > | 1|

registering udf to use in spark.sql('select...

2016-08-04 Thread Ben Teeuwen
Hi, I’d like to use a UDF in pyspark 2.0. As in .. def squareIt(x): return x * x # register the function and define return type …. spark.sql(“”"select myUdf(adgroupid, 'extra_string_parameter') as function_result from df’) _ How can I register the function? I only see reg

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Ben Teeuwen
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecu

Re: how to debug spark app?

2016-08-04 Thread Ben Teeuwen
Related question: what are good profiling tools other than watching along the application master with the running code? Are there things that can be logged during the run? If I have say 2 ways of accomplishing the same thing, and I want to learn about the time/memory/general resource blocking p

Re: How does MapWithStateRDD distribute the data

2016-08-03 Thread Ben Teeuwen
Did you check the executors logs to check whether the kafka offsets pulled in evenly over the 4 executors? I recall a similar situation with such uneven balancing from a kafka stream, and ended up raising the amount of resources (RAM and cores). Then it nicely balanced out. I don’t understand t

OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-03 Thread Ben Teeuwen
Hi, I want to one hot encode a column containing 56 million distinct values. My dataset is 800m rows + 17 columns. I first apply a StringIndexer, but it already breaks there giving a OOM java heap space error. I launch my app on YARN with: /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G

Materializing mapWithState .stateSnapshot() after ssc.stop

2016-07-28 Thread Ben Teeuwen
Hi all, I’ve posted a question regarding sessionizing events using scala and mapWithState at http://stackoverflow.com/questions/38541958/materialize-mapwithstate-statesnapshots-to-database-for-later-resume-of-spark-st