No auto decompress in Spark Java textFile function?

2015-09-08 Thread Chris Teoh
Hi Folks, I tried using Spark v1.2 on bz2 files in Java but the behaviour is different to the same textFile API call in Python and Scala. That being said, how do I process to read .tar.bz2 files in Spark's Java API? Thanks in advance Chris

Re: No auto decompress in Spark Java textFile function?

2015-09-09 Thread Chris Teoh
ed it on bz2 files. If > it isn't decompressing by default then what you have to do is to use the > sc.wholeTextFiles and then decompress each record (that being file) with > the corresponding codec. > > Thanks > Best Regards > > On Tue, Sep 8, 2015 at 6:49 PM, Chri

Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-01 Thread Chris Teoh
Do you have the full code example? I think this would be similar to the mapPartitions code flow, something like flatMap( _ => _.toList ) I haven't yet tested this out but this is how I'd first try. On Sat, 1 Dec 2018 at 01:02, James Starks wrote: > When processing data, I create an instance o

Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-01 Thread Chris Teoh
[2] at parallelize at :28 scala> rddIt.flatMap(_.toList) res4: org.apache.spark.rdd.RDD[MyClass] = MapPartitionsRDD[3] at flatMap at :26 res4 is what you're looking for. On Sat, 1 Dec 2018 at 21:09, Chris Teoh wrote: > Do you have the full code example? > > I think this wo

Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Chris Teoh
Hey there, Could you not just create a managed table using the DDL in Spark SQL and then written the data frame to the underlying folder or use Spark SQL to do an insert? Alternatively try create table as select. Iirc hive creates managed tables this way. I've not confirmed this works but I thin

Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Chris Teoh
external table. > > Is there any way to control/override this? > > Thanks, > Peter > > > On Wed, Feb 13, 2019, 13:09 Chris Teoh >> Hey there, >> >> Could you not just create a managed table using the DDL in Spark SQL and >> then written the data fram

Re: Map side join without broadcast

2019-06-29 Thread Chris Teoh
The closest thing I can think of here is if you have both dataframes written out using buckets. Hive uses this technique for join optimisation such that both datasets of the same bucket are read by the same mapper to achieve map side joins. On Sat., 29 Jun. 2019, 9:10 pm jelmer, wrote: > I have

Re: Implementing Upsert logic Through Streaming

2019-06-29 Thread Chris Teoh
Not sure what your needs are here. If you can afford to wait, increase your micro batch windows to a long period of time, aggregate your data by key every micro batch and then apply those changes to the Oracle database. Since you're using text file to stream, there's no way to pre partition your

Re: Implementing Upsert logic Through Streaming

2019-06-30 Thread Chris Teoh
Just thinking on this, if your needs can be addressed using batch instead of streaming, I think this is a viable solution. Using a lambda architecture approach seems like a possible solution. On Sun., 30 Jun. 2019, 9:54 am Chris Teoh, wrote: > Not sure what your needs are here. > > I

Re: Implementing Upsert logic Through Streaming

2019-07-01 Thread Chris Teoh
00 , 1234 > 2, 2000, 2234 > 3, 2000,3234 > 2, 2100,4234 > > Thanks > Sachit > > On Mon, 1 Jul 2019, 01:46 Chris Teoh, wrote: > >> Just thinking on this, if your needs can be addressed using batch instead >> of streaming, I think this is a viable solution. Using

Re: Map side join without broadcast

2019-07-01 Thread Chris Teoh
tial index so i end up with > one index in each partition. > > Then i read the words again but this time assign every partition to each > word and join it on the indices rdd by partition key. So effectively every > index will be queries > > Finally i merge the results from each in

Re: Learning Spark

2019-07-05 Thread Chris Teoh
Scala is better suited to data engineering work. It also has better integration with other components like HBase, Kafka, etc. Python is great for data scientists as there are more data science libraries available in Python. On Fri., 5 Jul. 2019, 7:40 pm Vikas Garg, wrote: > Is there any disadva

Re: Attempting to avoid a shuffle on join

2019-07-05 Thread Chris Teoh
Dataframes have a partitionBy function too. You can avoid a shuffle if one of your datasets is small enough to broadcast. On Thu., 4 Jul. 2019, 7:34 am Mkal, wrote: > Please keep in mind i'm fairly new to spark. > I have some spark code where i load two textfiles as datasets and after > some >

Re: Spark 2.3 Dataframe Grouby operation throws IllegalArgumentException on Large dataset

2019-07-24 Thread Chris Teoh
This might be a hint. Maybe invalid data? Caused by: java.lang.IllegalArgumentException: Missing required char ':' at 'struct>' On Wed., 24 Jul. 2019, 2:15 pm Balakumar iyer S, wrote: > Hi Bobby Evans, > > I apologise for the delayed response , yes you are right I missed out to > paste the com

Re: Reading configuration file in Spark Scala throws error

2019-08-03 Thread Chris Teoh
This seems to work- val printEntry = new java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]] { override def accept(a: java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = { println(a.getKey) } } conf.entrySet.iterator.forE

Re: [pyspark 2.4.3] small input csv ~3.4GB gets 40K tasks created

2019-08-30 Thread Chris Teoh
Look at your DAG. Are there lots of CSV files? Does your input CSV dataframe have lots of partitions to start with? Bear in mind cross join makes the dataset much larger so expect to have more tasks. On Fri, 30 Aug 2019 at 14:11, Rishi Shah wrote: > Hi All, > > I am scratching my head against th

Re: Control Sqoop job from Spark job

2019-08-30 Thread Chris Teoh
I'd say this is an uncommon approach, could you use a workflow/scheduling system to call Sqoop outside of Spark? Spark is usually multiprocess distributed so putting in this Sqoop job in the Spark code seems to imply you want to run Sqoop first, then Spark. If you're really insistent on this, call

Re: Control Sqoop job from Spark job

2019-09-02 Thread Chris Teoh
aunched based on dataframe values in spark job. Certainly it can be > isolated and broken. > > On Sat, Aug 31, 2019 at 8:07 AM Chris Teoh wrote: > >> I'd say this is an uncommon approach, could you use a workflow/scheduling >> system to call Sqoop outside of Spark? Spark is usua

Re: OOM Error

2019-09-07 Thread Chris Teoh
Hi Ankit, Without looking at the Spark UI and the stages/DAG, I'm guessing you're running on default number of Spark shuffle partitions. If you're seeing a lot of shuffle spill, you likely have to increase the number of shuffle partitions to accommodate the huge shuffle size. I hope that helps C

Re: OOM Error

2019-09-07 Thread Chris Teoh
if it would help if I repartition the data by > the fields I am using in group by and window operations? > > Best Regards > Ankit Khettry > > On Sat, 7 Sep, 2019, 1:05 PM Chris Teoh, wrote: > >> Hi Ankit, >> >> Without looking at the Spark UI and the stages/DA

Re: OOM Error

2019-09-07 Thread Chris Teoh
Khettry > > On Sat, Sep 7, 2019 at 2:56 PM Chris Teoh wrote: > >> You can try, consider processing each partition separately if your data >> is heavily skewed when you partition it. >> >> On Sat, 7 Sep 2019, 7:19 pm Ankit Khettry, >> wrote: >> >&g

Re: Request more yarn vcores than executors

2019-12-08 Thread Chris Teoh
I thought --executor-cores is the same the other argument. If anything, just set --executor-cores to something greater than 1 and don't set the other one you mentioned. You'll then get greater number of cores per executor so you can take on more simultaneous tasks per executor. On Sun, 8 Dec 2019,

Re: Request more yarn vcores than executors

2019-12-08 Thread Chris Teoh
> you can take on more simultaneous tasks per executor > > > That is exactly what I want to avoid. that nature of the task makes it > difficult to parallelise over many partitions. Ideally i'd have 1 executor > per task with 10+ cores assigned to each executor > > On

Re: Identify bottleneck

2019-12-18 Thread Chris Teoh
Please look at the spark UI and confirm you are indeed getting more than 1 partition in your dataframe. Text files are usually not splittable so you may just be doing all the work in a single partition. If that is the case, It may be worthwhile considering calling the repartition method to distrib

Re: Identify bottleneck

2019-12-19 Thread Chris Teoh
; Also, > the framework allow to execute all the modification at the same time as > one big request (but i wont paste it here, it would not be really relevant > > > -- > *De: *"Antoine DUBOIS" > *À: *"Enrico Minack" > *Cc

Re: Identify bottleneck

2019-12-19 Thread Chris Teoh
As far as I'm aware it isn't any better. The logic all gets processed by the same engine so to confirm, compare the DAGs generated from both approaches and see if they're identical. On Fri, 20 Dec 2019, 8:56 am ayan guha, wrote: > Quick question: Why is it better to use one sql vs multiple withC

Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
spark.sql.shuffle.partitions might be a start. Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions? On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, wrote: > Hi all, > > I have encountered a stra

Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
drop off. > > We keep hdfs block 128Mb > > On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh wrote: > >> spark.sql.shuffle.partitions might be a start. >> >> Is there a difference in the number of partitions when the parquet is >> read to spark.sql.shuffle.partitions

Re: Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Chris Teoh
iling stage, when spark reads the 20 >> files each 132M, it comes out to be 40 partitions. >> >> >> >> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh wrote: >> >>> If you're using Spark SQL, that configuration setting causes a shuffle >>> if the number

Re: Out of memory HDFS Read and Write

2019-12-22 Thread Chris Teoh
tioning output to more > partitions, 40 precisely (when it failed, we partitioned the output to 20 > after logic is finished but before writing to HDFS) have made the > read&write stage succeed. > > Not understanding how spark read&write stage can experience OOM issues. > Ho

Re: Spark Executor OOMs when writing Parquet

2020-01-17 Thread Chris Teoh
high. > 15000 partitions still failed. I am trying 3 partitions now. There is > still some disk spill but it is not that high. > > Thanks, > > Arwin > > -- > *From:* Chris Teoh > *Sent:* January 17, 2020 7:32 PM > *To:* Arwin Tio > *Cc:*

Re: Does explode lead to more usage of memory

2020-01-18 Thread Chris Teoh
I think it does mean more memory usage but consider how big your arrays are. Think about your use case requirements and whether it makes sense to use arrays. Also it may be preferable to explode if the arrays are very large. I'd say exploding arrays will make the data more splittable, having the ar

Re: Does explode lead to more usage of memory

2020-01-19 Thread Chris Teoh
Depends on the use case, if you have to join, you're saving a join and a shuffle from having it already in an array. If you explode, at least sort within partitions to get you predicate pushdown when you read the data next time. On Sun, 19 Jan 2020, 1:19 pm Jörn Franke, wrote: > Why not two tab

Re: RESTful Operations

2020-01-19 Thread Chris Teoh
Maybe something like Livy, otherwise roll your own REST API and have it start a Spark job. On Mon, 20 Jan 2020 at 06:55, wrote: > I am new to Spark. The task I want to accomplish is let client send http > requests, then spark process that request for further operations. However > searching Spark

Re: Submitting job with external dependencies to pyspark

2020-01-27 Thread Chris Teoh
Use --py-files See https://spark.apache.org/docs/latest/submitting-applications.html#bundling-your-applications-dependencies I hope that helps. On Tue, 28 Jan 2020, 9:46 am Tharindu Mathew, wrote: > Hi, > > Newbie to pyspark/spark here. > > I'm trying to submit a job to pyspark with a dependen

Re: Submitting job with external dependencies to pyspark

2020-01-28 Thread Chris Teoh
s really helpful. Thanks! I actually solved my problem using by > creating a venv and using the venv flags. Wondering now how to submit the > data as an archive? Any idea? > > On Mon, Jan 27, 2020, 9:25 PM Chris Teoh wrote: > >> Use --py-files >> >> See >>

Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Chris Teoh
Kafka can keep track of the offsets (in a separate topic based on your consumer group) you've seen but it is usually best effort and you're probably better off also keeping track of your offsets. If the producer resends a message you would have to dedupe it as you've most likely already seen it, h

Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Chris Teoh
ince kafka message is immutable, so a message will get sent with > a different offset instead of the same offset. > > So spark when reading from kafka is acting as a least once consumer? Why > does spark not do checkpointing for batch read of kafka? > > On Mon, Feb 3, 2020 at 1:36

Re: Apache Arrow support for Apache Spark

2020-02-17 Thread Chris Teoh
1. I'd also consider how you're structuring the data before applying the join, naively doing the join could be expensive so doing a bit of data preparation may be necessary to improve join performance. Try to get a baseline as well. Arrow would help improve it. 2. Try storing it back as Parquet bu