Re: java.lang.OutOfMemoryError: PermGen space

2015-06-24 Thread Srikanth
That worked. Thanks! I wonder what changed in 1.4 to cause this. It wouldn't work with anything less than 256m for a simple piece of code. 1.3.1 used to work with default(64m I think) Srikanth On Wed, Jun 24, 2015 at 12:47 PM, Roberto Coluccio < roberto.coluc...@gmail.com> wrote: &g

Re: Spark 1.4 RDD to DF fails with toDF()

2015-06-26 Thread Srikanth
irror; >> at WebLogAnalysis$.readWebLogFiles(WebLogAnalysis.scala:38) >> at WebLogAnalysis$.main(WebLogAnalysis.scala:62) >> at WebLogAnalysis.main(WebLogAnalysis.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at

Re: Spark 1.4 RDD to DF fails with toDF()

2015-06-29 Thread Srikanth
My error was related to Scala version. Upon further reading, I realized that it takes some effort to get Spark working with Scala 2.11. I've reverted to using 2.10 and moved past that error. Now I hit the issue you mentioned. Waiting for 1.4.1. Srikanth On Fri, Jun 26, 2015 at 9:10 AM, Ro

BroadcastHashJoin when RDD is not cached

2015-07-01 Thread Srikanth
DF(date#20) AS upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22] PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at rddToDataFrameHolder at DataSourceReader.scala:41 Srikanth

Re: BroadcastHashJoin when RDD is not cached

2015-07-02 Thread Srikanth
core/src/main/scala/org/apache/spark/sql/functions.scala#L581 > ) > > On Wed, Jul 1, 2015 at 8:30 AM, Srikanth wrote: > >> Hello, >> >> >> >> I have a straight forward use case of joining a large table with a >> smaller table. The small table is

Re: How do we control output part files created by Spark job?

2015-07-07 Thread Srikanth
Did you do yourRdd.coalesce(6).saveAsTextFile() or yourRdd.coalesce(6) yourRdd.saveAsTextFile() ? Srikanth On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha wrote: > Hi I tried both approach using df. repartition(6) and df.coalesce(6) it > d

Re: Parallelizing multiple RDD / DataFrame creation in Spark

2015-07-08 Thread Srikanth
Your tableLoad() APIs are not actions. File will be read fully only when an action is performed. If the action is something like table1.join(table2), then I think both files will be read in parallel. Can you try that and look at the execution plan or in 1.4 this is shown in Spark UI. Srikanth On

Re: How do we control output part files created by Spark job?

2015-07-10 Thread Srikanth
Is there a join involved in your sql? Have a look at spark.sql.shuffle.partitions? Srikanth On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha wrote: > Hi Srikant thanks for the response. I have the following code: > > hiveContext.sql("insert into... ").coalesce(6) > > Abo

Re: How do we control output part files created by Spark job?

2015-07-11 Thread Srikanth
> I think reducing shuffle partitions will slower my group by query of > hiveContext or it wont slow it down please guide. > > On Sat, Jul 11, 2015 at 7:41 AM, Srikanth wrote: > >> Is there a join involved in your sql? >> Have a look at spark.sql.shuffle.partitions? >

cache() VS cacheTable()

2015-07-13 Thread Srikanth
che is way less than RDD cache. I thought this difference is due to columnar format used by dataframe. As per the statement in the book, cache size should be similar. Srikanth

HiveThriftServer2.startWithContext error with registerTempTable

2015-07-13 Thread Srikanth
nc(HiveSessionImpl.java:218) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I'm able to read the other table("my_table") from Beeline though. Any suggestions on how to overcome this? This is with Spark 1.4 pre-built version. Spark-shell was started with --package to pass spark-csv. Srikanth

Re: HiveThriftServer2.startWithContext error with registerTempTable

2015-07-15 Thread Srikanth
Hello, Re-sending this to see if I'm second time lucky! I've not managed to move past this error. Srikanth On Mon, Jul 13, 2015 at 9:14 PM, Srikanth wrote: > Hello, > > I want to expose result of Spark computation to external tools. I plan to > do this with Thrift se

Re: HiveThriftServer2.startWithContext error with registerTempTable

2015-07-16 Thread Srikanth
flect.NativeMethodAccessorImpl.invoke0(Native Method) Srikanth On Thu, Jul 16, 2015 at 12:44 AM, Cheng, Hao wrote: > Have you ever try query the “select * from temp_table” from the spark > shell? Or can you try the option --jars while starting the spark shell? > > > >

Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
Hello, I'd like to understand how Spark Streaming(direct) would handle Kafka partition addition? Will a running job be aware of new partitions and read from it? Since it uses Kafka APIs to query offsets and offsets are handled internally. Srikanth

Re: Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
uld pick up new partitions as they are > added. > > On Fri, Jul 22, 2016 at 11:29 AM, Srikanth wrote: > > Hello, > > > > I'd like to understand how Spark Streaming(direct) would handle Kafka > > partition addition? > > Will a running job be aware of

Re: Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
Yeah, that's what I thought. We need to redefine not just restart. Thanks for the info! I do see the usage of subscribe[K,V] in your DStreams example. Looks simple but its not very obvious how it works :-) I'll watch out for the docs and ScalaDoc. Srikanth On Fri, Jul 22, 2016 at 2:1

Re: Rebalancing when adding kafka partitions

2016-08-12 Thread Srikanth
ions} partitions.") Should I be setting some parameter/config? Is the doc for new integ available? Thanks, Srikanth On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger wrote: > No, restarting from a checkpoint won't do it, you need to re-define the > stream. > > Here's

Re: Rebalancing when adding kafka partitions

2016-08-16 Thread Srikanth
tern pattern to subscribe to > * @param kafkaParams Kafka Who does the new partition discover? Underlying kafka consumer or spark-streaming-kafka-0-10-assembly?? Srikanth On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger wrote: > Hrrm, that's interesting. Did you try with su

Broadcast join on multiple dataframes

2016-01-28 Thread Srikanth
id-orgs.txt),false,,, : : : +- Scan JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130] InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl : : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths: file:/shared/data/t1_meta/t1_meta_exchange.jsonl : +- ConvertToUnsafe : +- Scan CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false, +- ConvertToUnsafe +- Scan CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false Srikanth

Re: Broadcast join on multiple dataframes

2016-01-29 Thread Srikanth
Micheal, Output of DF.queryExecution is saved to https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0 I don't see anything in this to suggest a switch in strategy. Hopefully you find this helpful. Srikanth On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrust wrote: > Can you pro

Re: Broadcast join on multiple dataframes

2016-02-04 Thread Srikanth
Hello, Any pointers on what is causing the optimizer to convert broadcast to shuffle join? This join is with a file that is just 4kb in size. Complete plan --> https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0 DAG from UI --> https://www.dropbox.com/s/4xc9d0rdkx2fun8/DAG_with_se

spark-csv partitionBy

2016-02-09 Thread Srikanth
icks.spark.csv").save(s"hdfs:///output/id=$id/") }) This approach doesn't scale well. Especially since no.of unique IDs can be between 500-700. And adding a second partition column will make this even worst. Wondering if anyone has an efficient work around? Srikanth

Streaming with broadcast joins

2016-02-17 Thread Srikanth
Hello, I have a streaming use case where I plan to keep a dataset broadcasted and cached on each executor. Every micro batch in streaming will create a DF out of the RDD and join the batch. The below code will perform the broadcast operation for each RDD. Is there a way to broadcast it just once?

Re: Streaming with broadcast joins

2016-02-18 Thread Srikanth
that did not provide this behavior. Srikanth On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu wrote: > You should be able to broadcast that data frame using sc.broadcast and > join against it. > > On Wed, 17 Feb 2016, 21:13 Srikanth wrote: > >> Hello, >> >> I have

Re: Streaming with broadcast joins

2016-02-18 Thread Srikanth
a/output/streaming/"+timestamp) } On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu wrote: > Can you paste the code where you use sc.broadcast ? > > On Thu, Feb 18, 2016 at 5:32 PM Srikanth wrote: > >> Sebastian, >> >> I was able to broadcast using sql

Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
u mean? did it fail? it didnt broadcast? > > On Thu, Feb 18, 2016 at 11:43 PM Srikanth wrote: > >> Code with SQL broadcast hint. This worked and I was able to see that >> broadcastjoin was performed. >> >> val testDF = sqlContext.read.format("com.databricks.

Re: listening to recursive folder structures in s3 using pyspark streaming (textFileStream)

2016-02-19 Thread Srikanth
Apparently you can pass comma separated folders. Try the suggestion given here --> http://stackoverflow.com/questions/29426246/spark-streaming-textfilestream-not-supporting-wildcards Let me know if this helps Srikanth On Wed, Feb 17, 2016 at 5:47 PM, Shixiong(Ryan) Zhu wrote: > textFile

Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
Hmmm..OK. Srikanth On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu wrote: > I don't have the code with me now, and I ended moving everything to RDD in > the end and using map operations to do some lookups, i.e. instead of > broadcasting a Dataframe I ended broadcasting a Map >

Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
don't have the code with me now, and I ended moving everything to RDD in > the end and using map operations to do some lookups, i.e. instead of > broadcasting a Dataframe I ended broadcasting a Map > > > On Fri, Feb 19, 2016 at 11:39 AM Srikanth wrote: > >> It didn

Re: Streaming with broadcast joins

2016-02-20 Thread Srikanth
Sabastian, *Update:-* This is not possible. Probably will remain this way for the foreseeable future. https://issues.apache.org/jira/browse/SPARK-3863 Srikanth On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu wrote: > I don't have the code with me now, and I ended moving everything t

RowId unique key for Dataframes

2015-07-21 Thread Srikanth
queId. sqlContext.textFile(file). > zipWithUniqueId(). > map(case(d, i)=>i.toString + delimiter + d). > map(_.split(delimiter)). > map(s=>caseclass(...)) .toDF().select("field1, "field2") Its a bit hacky. Is there an easier way to do this on dataframes and use spark-csv? Srikanth

Re: RowId unique key for Dataframes

2015-07-21 Thread Srikanth
Will work. Thanks! zipWithUniqueId() doesn't guarantee continuous ID either. Srikanth On Tue, Jul 21, 2015 at 9:48 PM, Burak Yavuz wrote: > Would monotonicallyIncreasingId > <https://github.com/apache/spark/blob/d4c7a7a3642a74ad40093c96c4bf45a62a470605/sql/core/src/main/scala/or

spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
was used with 6 cores. Is this a bug? This is with Spark 1.4. [image: Inline image 1] Srikanth

Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
Cool. Thanks! Srikanth On Wed, Jul 22, 2015 at 3:12 PM, Andrew Or wrote: > Hi Srikanth, > > I was able to reproduce the issue by setting `spark.cores.max` to a number > greater than the number of cores on a worker. I've filed SPARK-9260 which I > believe is already be

ShuffledHashJoin instead of CartesianProduct

2015-07-22 Thread Srikanth
").join(ELAccountDF.as("EL").filter("EmailAddress != > ''")).where($"SF.Email" === $"EL.EmailAddress") > > val matchId = > phoneMatch.unionAll(emailMatch.unionAll(faxMatch.unionAll(mobileMatch))) > matchId.cache().registerTempTable("matchId") Is there a more elegant way to do this? On a related note, has anyone worked on record linkage using Bloom Filters, Levenshtein distance, etc in Spark? Srikanth

spark-csv number of partitions

2015-07-28 Thread Srikanth
Hello, I'm using spark-csv instead of sc.textfile() to work with CSV files. How can I set no.of partitions that will be created when reading a CSV? Basically an equivalent for minPartitions in textFile() val myrdd = sc.textFile("my.csv",24) Srikanth

How does DataFrame except work?

2015-08-03 Thread Srikanth
it do auto broadcast if second dataframe is small enough? Srikanth

Estimate size of Dataframe programatically

2015-08-07 Thread Srikanth
f RDD. With dataframes, its tricky due to columnar storage. How do we do it? On a related note, I see size of RDD object to be ~60MB. Is that the footprint of RDD in driver JVM? scala> val temp = sc.parallelize(Array(1,2,3,4,5,6)) scala> SizeEstimator.estimate(temp) res13: Long = 69507320 Srikanth

Re: Estimate size of Dataframe programatically

2015-08-10 Thread Srikanth
w how spark.sql.autoBroadcastJoinThreshold estimates size of dataframe. Is it going to broadcast when columnar storage size is less that 10 MB? Srikanth On Fri, Aug 7, 2015 at 2:51 PM, Ted Yu wrote: > Have you tried calling SizeEstimator.estimate() on a DataFrame ? > > I did the following in R

Dataframe collect() work but count() fails

2015-08-26 Thread Srikanth
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140) > at > org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124) > at > org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) Any idea what is wrong here? Srikanth

Spark 2.0 with Kafka 0.10 exception

2016-08-23 Thread Srikanth
Sometimes a few batches are scheduled and run fine. Then I get this error. kafkacat is able to fetch from this topic continuously. Full exception is here -- https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 Srikanth

Re: Spark 2.0 with Kafka 0.10 exception

2016-08-24 Thread Srikanth
Thanks Cody. Setting poll timeout helped. Our network is fine but brokers are not fully provisioned in test cluster. But there isn't enough load to max out on broker capacity. Curious that kafkacat running on the same node doesn't have any issues. Srikanth On Tue, Aug 23, 2016 at 9:5

Reset auto.offset.reset in Kafka 0.10 integ

2016-09-02 Thread Srikanth
ps://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160 How to force it to restart in this case (fully aware of potential data loss)? Srikanth

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-06 Thread Srikanth
> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth wrote: > > You are right. I got confused as its all part of same log when running > from > > IDE. > > I was looking for a good guide to read to understand the this integ. > > > > I'm not managing offset on my own.

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Srikanth
> If your retention is so low that retention gets expired in between > when the driver created a batch with a given starting offset, and when > an executor starts to process that batch, you're going to have > problems. > > On Tue, Sep 6, 2016 at 2:30 PM, Srikanth wrote

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Srikanth
; - you start that job back up, and it errors because the last committed > offset is no longer available > - you think that instead of erroring, the job should silently restart > based on the value of auto.offset.reset > > Is that accurate? > > > On Wed, Sep 7, 2016 at 10:44 AM,

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
Executor: Finished task 1.1 in stage 138.0 (TID 7854). 1103 bytes result sent to driver On Wed, Aug 24, 2016 at 2:13 PM, Srikanth wrote: > Thanks Cody. Setting poll timeout helped. > Our network is fine but brokers are not fully provisioned in test cluster. > But there isn't enough lo

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
rce) at java.lang.Thread.run(Unknown Source) On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger wrote: > you could try setting > > spark.streaming.kafka.consumer.cache.initialCapacity > > spark.streaming.kafka.consumer.cache.maxCapacity > > to 1 > > On Wed, Sep 7, 2016 at 2

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
Yea, disabling cache was not going to be my permanent solution either. I was going to ask how big an overhead is that? It happens intermittently and each time it happens retry is successful. Srikanth On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger wrote: > That's not what I would have

"Job duration" and "Processing time" don't match

2016-09-08 Thread Srikanth
FORMED", "delimiter" -> "\t", "header" -> "false")) .partitionBy("entityId", "regionId", "eventDate") .save(outputPath) Removing SaveMode.Append really speeds things up and also the mismatch between Job duration and processing time disappears. I'm not able to explain what is causing this though. Srikanth

Spark with S3 DirectOutputCommitter

2016-09-09 Thread Srikanth
fig("mapreduce.use.directfileoutputcommitter", "true") //.config("spark.sql.sources.outputCommitterClass", classOf[DirectOutputCommitter].getCanonicalName) .getOrCreate() Srikanth

Re: Spark with S3 DirectOutputCommitter

2016-09-12 Thread Srikanth
files, rename from _temporary is just not practical in S3. I guess we have to add another stage with S3Distcp?? Srikanth On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran wrote: > > > On 9 Sep 2016, at 21:54, Srikanth wrote: > > > > Hello, > > > > I'm trying t

Kafka 0.10 integ offset commit

2016-10-08 Thread Srikanth
source that can't be idempotent. As such the operations are assumed to be atleast once. This seems to be one place where duplicates and be reduced. Srikanth

Re: Kafka 0.10 integ offset commit

2016-10-08 Thread Srikanth
dn't be an issue. > > On Sat, Oct 8, 2016 at 7:25 PM, Srikanth wrote: > > Hello, > > > > Spark streaming kafka 0.10 integ provides an option to commit offset to > > kafka using commitAsyn() API. > > This only records the offset commit request. The actual comm

Re: Kafka 0.10 integ offset commit

2016-10-09 Thread Srikanth
eally important to you, it should > be pretty straightforward for you to hack on it to allow it at your > own risk. There is a check for concurrent access in the consumer, so > worst case scenario you should get an exception. > > On Sat, Oct 8, 2016 at 9:18 PM, Srikanth wrote: >

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-19 Thread Srikanth
want > to do with the new kafka consumer. > > > As far as the original issue, are you seeing those polling errors > intermittently, or consistently? From your description, it sounds > like retry is working correctly. > > > On Wed, Sep 7, 2016 at 2:37 PM, Srikanth wrote:

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-20 Thread Srikanth
r related > configs? > > On Wed, Oct 19, 2016 at 12:22 PM, Srikanth wrote: > > Bringing this thread back as I'm seeing this exception on a production > kafka > > cluster. > > > > I have two Spark streaming apps reading the same topic. App1 has batch > &g

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-21 Thread Srikanth
in the docs. > > On Thu, Oct 20, 2016 at 12:13 PM, Srikanth wrote: > > Yeah, setting those params helped. > > > > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger > wrote: > >> > >> 60 seconds for a batch is above the default settings in kafka related > &

Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
? Regards, Srikanth

Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
time. So will driver fail and exit in such cases? I've seen drivers exit after a job has hit max retry attempts. This is different though rt? Srikanth On Tue, Feb 7, 2017 at 5:25 PM, Tathagata Das wrote: > Does restarting after a few minutes solves the problem? Could be a > tran

Spark streaming + kafka error with json library

2017-03-29 Thread Srikanth
:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling json4s-native. Any idea how to resolve this? I'm using spark version 2.1.0 Thanks, Srikanth

Re: Spark streaming + kafka error with json library

2017-03-30 Thread Srikanth
Thanks for the tip. That worked. When would one use the assembly? On Wed, Mar 29, 2017 at 7:13 PM, Tathagata Das wrote: > Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly) > > On Wed, Mar 29, 2017 at 9:59 AM, Srikanth wrote: > >> Hello, >>

Spark streaming app leaking memory?

2017-05-16 Thread Srikanth
] 17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7807: [rdd_1_39] I notice that "Managed memory leak" logs are not seen when I use G1GC. Srikanth

Task failure to read input files

2018-04-13 Thread Srikanth
library to print the full file name when such failures happen? So that I can then manually check if the file is indeed corrupted. Thanks, Srikanth

Adding additional jars to distributed cache (yarn-client)

2015-09-07 Thread Srikanth Sundarrajan
would just add the SPARK_JAR and APP_JAR. Am wondering what is the best way to add additional files to Distributed cache and also have them appear in the classpath for ExecutorLauncher. Thanks Srikanth Sundarrajan

Spark ML DAG Pipelines

2017-09-07 Thread Srikanth Sampath
Hi Spark Experts, Can someone point me to some examples for non-linear (DAG) ML pipelines. That would be of great help. Thanks much in advance -Srikanth

Re: Core allocation is scattered

2019-07-25 Thread Srikanth Sriram
de1=16 cores > and node 2=4 cores . but cores are allocated like node1=2 node > =1-node 14=1 like that. Is there any conf property i need to > change. I know with dynamic allocation we can use below but without dynamic > allocation is there any? > --conf "spark.dynamicAllocation.maxExecutors=2" > > > Thanks > Amit > -- Regards, Srikanth Sriram