That worked. Thanks!
I wonder what changed in 1.4 to cause this. It wouldn't work with anything
less than 256m for a simple piece of code.
1.3.1 used to work with default(64m I think)
Srikanth
On Wed, Jun 24, 2015 at 12:47 PM, Roberto Coluccio <
roberto.coluc...@gmail.com> wrote:
&g
irror;
>> at WebLogAnalysis$.readWebLogFiles(WebLogAnalysis.scala:38)
>> at WebLogAnalysis$.main(WebLogAnalysis.scala:62)
>> at WebLogAnalysis.main(WebLogAnalysis.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
My error was related to Scala version. Upon further reading, I realized
that it takes some effort to get Spark working with Scala 2.11.
I've reverted to using 2.10 and moved past that error. Now I hit the issue
you mentioned. Waiting for 1.4.1.
Srikanth
On Fri, Jun 26, 2015 at 9:10 AM, Ro
DF(date#20) AS
upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]
PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
rddToDataFrameHolder at DataSourceReader.scala:41
Srikanth
core/src/main/scala/org/apache/spark/sql/functions.scala#L581
> )
>
> On Wed, Jul 1, 2015 at 8:30 AM, Srikanth wrote:
>
>> Hello,
>>
>>
>>
>> I have a straight forward use case of joining a large table with a
>> smaller table. The small table is
Did you do
yourRdd.coalesce(6).saveAsTextFile()
or
yourRdd.coalesce(6)
yourRdd.saveAsTextFile()
?
Srikanth
On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha wrote:
> Hi I tried both approach using df. repartition(6) and df.coalesce(6) it
> d
Your tableLoad() APIs are not actions. File will be read fully only when an
action is performed.
If the action is something like table1.join(table2), then I think both
files will be read in parallel.
Can you try that and look at the execution plan or in 1.4 this is shown in
Spark UI.
Srikanth
On
Is there a join involved in your sql?
Have a look at spark.sql.shuffle.partitions?
Srikanth
On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha wrote:
> Hi Srikant thanks for the response. I have the following code:
>
> hiveContext.sql("insert into... ").coalesce(6)
>
> Abo
> I think reducing shuffle partitions will slower my group by query of
> hiveContext or it wont slow it down please guide.
>
> On Sat, Jul 11, 2015 at 7:41 AM, Srikanth wrote:
>
>> Is there a join involved in your sql?
>> Have a look at spark.sql.shuffle.partitions?
>
che is way less than RDD cache. I thought
this difference is due to columnar format used by dataframe. As per the
statement in the book, cache size should be similar.
Srikanth
nc(HiveSessionImpl.java:218)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
I'm able to read the other table("my_table") from Beeline though.
Any suggestions on how to overcome this?
This is with Spark 1.4 pre-built version. Spark-shell was started with
--package to pass spark-csv.
Srikanth
Hello,
Re-sending this to see if I'm second time lucky!
I've not managed to move past this error.
Srikanth
On Mon, Jul 13, 2015 at 9:14 PM, Srikanth wrote:
> Hello,
>
> I want to expose result of Spark computation to external tools. I plan to
> do this with Thrift se
flect.NativeMethodAccessorImpl.invoke0(Native Method)
Srikanth
On Thu, Jul 16, 2015 at 12:44 AM, Cheng, Hao wrote:
> Have you ever try query the “select * from temp_table” from the spark
> shell? Or can you try the option --jars while starting the spark shell?
>
>
>
>
Hello,
I'd like to understand how Spark Streaming(direct) would handle Kafka
partition addition?
Will a running job be aware of new partitions and read from it?
Since it uses Kafka APIs to query offsets and offsets are handled
internally.
Srikanth
uld pick up new partitions as they are
> added.
>
> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth wrote:
> > Hello,
> >
> > I'd like to understand how Spark Streaming(direct) would handle Kafka
> > partition addition?
> > Will a running job be aware of
Yeah, that's what I thought. We need to redefine not just restart.
Thanks for the info!
I do see the usage of subscribe[K,V] in your DStreams example.
Looks simple but its not very obvious how it works :-)
I'll watch out for the docs and ScalaDoc.
Srikanth
On Fri, Jul 22, 2016 at 2:1
ions} partitions.")
Should I be setting some parameter/config? Is the doc for new integ
available?
Thanks,
Srikanth
On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger wrote:
> No, restarting from a checkpoint won't do it, you need to re-define the
> stream.
>
> Here's
tern pattern to subscribe to
> * @param kafkaParams Kafka
Who does the new partition discover? Underlying kafka consumer or
spark-streaming-kafka-0-10-assembly??
Srikanth
On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger wrote:
> Hrrm, that's interesting. Did you try with su
id-orgs.txt),false,,,
: : : +- Scan
JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl
: : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
file:/shared/data/t1_meta/t1_meta_exchange.jsonl
: +- ConvertToUnsafe
: +- Scan
CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false,
+- ConvertToUnsafe
+- Scan
CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false
Srikanth
Micheal,
Output of DF.queryExecution is saved to
https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0
I don't see anything in this to suggest a switch in strategy. Hopefully you
find this helpful.
Srikanth
On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrust
wrote:
> Can you pro
Hello,
Any pointers on what is causing the optimizer to convert broadcast to
shuffle join?
This join is with a file that is just 4kb in size.
Complete plan -->
https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0
DAG from UI -->
https://www.dropbox.com/s/4xc9d0rdkx2fun8/DAG_with_se
icks.spark.csv").save(s"hdfs:///output/id=$id/")
})
This approach doesn't scale well. Especially since no.of unique IDs can be
between 500-700.
And adding a second partition column will make this even worst.
Wondering if anyone has an efficient work around?
Srikanth
Hello,
I have a streaming use case where I plan to keep a dataset broadcasted and
cached on each executor.
Every micro batch in streaming will create a DF out of the RDD and join the
batch.
The below code will perform the broadcast operation for each RDD. Is there
a way to broadcast it just once?
that did not provide this behavior.
Srikanth
On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu
wrote:
> You should be able to broadcast that data frame using sc.broadcast and
> join against it.
>
> On Wed, 17 Feb 2016, 21:13 Srikanth wrote:
>
>> Hello,
>>
>> I have
a/output/streaming/"+timestamp)
}
On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu
wrote:
> Can you paste the code where you use sc.broadcast ?
>
> On Thu, Feb 18, 2016 at 5:32 PM Srikanth wrote:
>
>> Sebastian,
>>
>> I was able to broadcast using sql
u mean? did it fail? it didnt broadcast?
>
> On Thu, Feb 18, 2016 at 11:43 PM Srikanth wrote:
>
>> Code with SQL broadcast hint. This worked and I was able to see that
>> broadcastjoin was performed.
>>
>> val testDF = sqlContext.read.format("com.databricks.
Apparently you can pass comma separated folders.
Try the suggestion given here -->
http://stackoverflow.com/questions/29426246/spark-streaming-textfilestream-not-supporting-wildcards
Let me know if this helps
Srikanth
On Wed, Feb 17, 2016 at 5:47 PM, Shixiong(Ryan) Zhu wrote:
> textFile
Hmmm..OK.
Srikanth
On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu
wrote:
> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth wrote:
>
>> It didn
Sabastian,
*Update:-* This is not possible. Probably will remain this way for the
foreseeable future.
https://issues.apache.org/jira/browse/SPARK-3863
Srikanth
On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu
wrote:
> I don't have the code with me now, and I ended moving everything t
queId.
sqlContext.textFile(file).
> zipWithUniqueId().
> map(case(d, i)=>i.toString + delimiter + d).
> map(_.split(delimiter)).
> map(s=>caseclass(...))
.toDF().select("field1, "field2")
Its a bit hacky. Is there an easier way to do this on dataframes and use
spark-csv?
Srikanth
Will work. Thanks!
zipWithUniqueId() doesn't guarantee continuous ID either.
Srikanth
On Tue, Jul 21, 2015 at 9:48 PM, Burak Yavuz wrote:
> Would monotonicallyIncreasingId
> <https://github.com/apache/spark/blob/d4c7a7a3642a74ad40093c96c4bf45a62a470605/sql/core/src/main/scala/or
was used with 6 cores.
Is this a bug? This is with Spark 1.4.
[image: Inline image 1]
Srikanth
Cool. Thanks!
Srikanth
On Wed, Jul 22, 2015 at 3:12 PM, Andrew Or wrote:
> Hi Srikanth,
>
> I was able to reproduce the issue by setting `spark.cores.max` to a number
> greater than the number of cores on a worker. I've filed SPARK-9260 which I
> believe is already be
").join(ELAccountDF.as("EL").filter("EmailAddress !=
> ''")).where($"SF.Email" === $"EL.EmailAddress")
>
> val matchId =
> phoneMatch.unionAll(emailMatch.unionAll(faxMatch.unionAll(mobileMatch)))
> matchId.cache().registerTempTable("matchId")
Is there a more elegant way to do this?
On a related note, has anyone worked on record linkage using Bloom Filters,
Levenshtein distance, etc in Spark?
Srikanth
Hello,
I'm using spark-csv instead of sc.textfile() to work with CSV files.
How can I set no.of partitions that will be created when reading a CSV?
Basically an equivalent for minPartitions in textFile()
val myrdd = sc.textFile("my.csv",24)
Srikanth
it do auto broadcast if second dataframe is small enough?
Srikanth
f RDD. With dataframes, its tricky due to columnar storage.
How do we do it?
On a related note, I see size of RDD object to be ~60MB. Is that the
footprint of RDD in driver JVM?
scala> val temp = sc.parallelize(Array(1,2,3,4,5,6))
scala> SizeEstimator.estimate(temp)
res13: Long = 69507320
Srikanth
w how spark.sql.autoBroadcastJoinThreshold estimates
size of dataframe. Is it going to broadcast when columnar storage size is
less that 10 MB?
Srikanth
On Fri, Aug 7, 2015 at 2:51 PM, Ted Yu wrote:
> Have you tried calling SizeEstimator.estimate() on a DataFrame ?
>
> I did the following in R
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140)
> at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124)
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
Any idea what is wrong here?
Srikanth
Sometimes a few batches are scheduled and
run fine. Then I get this error.
kafkacat is able to fetch from this topic continuously.
Full exception is here --
https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
Srikanth
Thanks Cody. Setting poll timeout helped.
Our network is fine but brokers are not fully provisioned in test cluster.
But there isn't enough load to max out on broker capacity.
Curious that kafkacat running on the same node doesn't have any issues.
Srikanth
On Tue, Aug 23, 2016 at 9:5
ps://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160
How to force it to restart in this case (fully aware of potential data
loss)?
Srikanth
> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth wrote:
> > You are right. I got confused as its all part of same log when running
> from
> > IDE.
> > I was looking for a good guide to read to understand the this integ.
> >
> > I'm not managing offset on my own.
> If your retention is so low that retention gets expired in between
> when the driver created a batch with a given starting offset, and when
> an executor starts to process that batch, you're going to have
> problems.
>
> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth wrote
; - you start that job back up, and it errors because the last committed
> offset is no longer available
> - you think that instead of erroring, the job should silently restart
> based on the value of auto.offset.reset
>
> Is that accurate?
>
>
> On Wed, Sep 7, 2016 at 10:44 AM,
Executor: Finished task 1.1 in stage 138.0 (TID
7854). 1103 bytes result sent to driver
On Wed, Aug 24, 2016 at 2:13 PM, Srikanth wrote:
> Thanks Cody. Setting poll timeout helped.
> Our network is fine but brokers are not fully provisioned in test cluster.
> But there isn't enough lo
rce)
at java.lang.Thread.run(Unknown Source)
On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger wrote:
> you could try setting
>
> spark.streaming.kafka.consumer.cache.initialCapacity
>
> spark.streaming.kafka.consumer.cache.maxCapacity
>
> to 1
>
> On Wed, Sep 7, 2016 at 2
Yea, disabling cache was not going to be my permanent solution either.
I was going to ask how big an overhead is that?
It happens intermittently and each time it happens retry is successful.
Srikanth
On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger wrote:
> That's not what I would have
FORMED", "delimiter" -> "\t",
"header" -> "false"))
.partitionBy("entityId", "regionId", "eventDate")
.save(outputPath)
Removing SaveMode.Append really speeds things up and also the mismatch
between Job duration and processing time disappears.
I'm not able to explain what is causing this though.
Srikanth
fig("mapreduce.use.directfileoutputcommitter", "true")
//.config("spark.sql.sources.outputCommitterClass",
classOf[DirectOutputCommitter].getCanonicalName)
.getOrCreate()
Srikanth
files, rename from _temporary is just not practical in S3.
I guess we have to add another stage with S3Distcp??
Srikanth
On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran
wrote:
>
> > On 9 Sep 2016, at 21:54, Srikanth wrote:
> >
> > Hello,
> >
> > I'm trying t
source that can't be idempotent. As such the
operations are assumed to be atleast once.
This seems to be one place where duplicates and be reduced.
Srikanth
dn't be an issue.
>
> On Sat, Oct 8, 2016 at 7:25 PM, Srikanth wrote:
> > Hello,
> >
> > Spark streaming kafka 0.10 integ provides an option to commit offset to
> > kafka using commitAsyn() API.
> > This only records the offset commit request. The actual comm
eally important to you, it should
> be pretty straightforward for you to hack on it to allow it at your
> own risk. There is a check for concurrent access in the consumer, so
> worst case scenario you should get an exception.
>
> On Sat, Oct 8, 2016 at 9:18 PM, Srikanth wrote:
>
want
> to do with the new kafka consumer.
>
>
> As far as the original issue, are you seeing those polling errors
> intermittently, or consistently? From your description, it sounds
> like retry is working correctly.
>
>
> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth wrote:
r related
> configs?
>
> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth wrote:
> > Bringing this thread back as I'm seeing this exception on a production
> kafka
> > cluster.
> >
> > I have two Spark streaming apps reading the same topic. App1 has batch
> &g
in the docs.
>
> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth wrote:
> > Yeah, setting those params helped.
> >
> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger
> wrote:
> >>
> >> 60 seconds for a batch is above the default settings in kafka related
> &
?
Regards,
Srikanth
time. So will driver fail and exit in such cases?
I've seen drivers exit after a job has hit max retry attempts. This is
different though rt?
Srikanth
On Tue, Feb 7, 2017 at 5:25 PM, Tathagata Das
wrote:
> Does restarting after a few minutes solves the problem? Could be a
> tran
:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class
DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling
json4s-native.
Any idea how to resolve this? I'm using spark version 2.1.0
Thanks,
Srikanth
Thanks for the tip. That worked. When would one use the assembly?
On Wed, Mar 29, 2017 at 7:13 PM, Tathagata Das
wrote:
> Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly)
>
> On Wed, Mar 29, 2017 at 9:59 AM, Srikanth wrote:
>
>> Hello,
>>
]
17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7807:
[rdd_1_39]
I notice that "Managed memory leak" logs are not seen when I use G1GC.
Srikanth
library to print the full file name
when such failures happen? So that I can then manually check if the file is
indeed corrupted.
Thanks,
Srikanth
would just add the SPARK_JAR and APP_JAR. Am wondering what is the best way to
add additional files to Distributed cache and also have them appear in the
classpath for ExecutorLauncher.
Thanks
Srikanth Sundarrajan
Hi Spark Experts,
Can someone point me to some examples for non-linear (DAG) ML pipelines.
That would be of great help.
Thanks much in advance
-Srikanth
de1=16 cores
> and node 2=4 cores . but cores are allocated like node1=2 node
> =1-node 14=1 like that. Is there any conf property i need to
> change. I know with dynamic allocation we can use below but without dynamic
> allocation is there any?
> --conf "spark.dynamicAllocation.maxExecutors=2"
>
>
> Thanks
> Amit
>
--
Regards,
Srikanth Sriram
67 matches
Mail list logo