createDirectStream and Stats

2015-06-18 Thread Tim Smith
Hi,

I just switched from "createStream" to the "createDirectStream" API for
kafka and while things otherwise seem happy, the first thing I noticed is
that stream/receiver stats are gone from the Spark UI :( Those stats were
very handy for keeping an eye on health of the app.

What's the best way to re-create those in the Spark UI? Maintain
Accumulators? Would be really nice to get back receiver-like stats even
though I understand that "createDirectStream" is a receiver-less design.

Thanks,

Tim


Re: createDirectStream and Stats

2015-06-18 Thread Tim Smith
Thanks for the super-fast response, TD :)

I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
are you listening? :D





On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
wrote:

> Are you using Spark 1.3.x ? That explains. This issue has been fixed in
> Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
> stats. :)
>
> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith  wrote:
>
>> Hi,
>>
>> I just switched from "createStream" to the "createDirectStream" API for
>> kafka and while things otherwise seem happy, the first thing I noticed is
>> that stream/receiver stats are gone from the Spark UI :( Those stats were
>> very handy for keeping an eye on health of the app.
>>
>> What's the best way to re-create those in the Spark UI? Maintain
>> Accumulators? Would be really nice to get back receiver-like stats even
>> though I understand that "createDirectStream" is a receiver-less design.
>>
>> Thanks,
>>
>> Tim
>>
>>
>>
>


Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
Update on performance of the new API: the new code using the
createDirectStream API ran overnight and when I checked the app state in
the morning, there were massive scheduling delays :(

Not sure why and haven't investigated a whole lot. For now, switched back
to the createStream API build of my app. Yes, for the record, this is with
CDH 5.4.1 and Spark 1.3.



On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith  wrote:

> Thanks for the super-fast response, TD :)
>
> I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
> are you listening? :D
>
>
>
>
>
> On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Are you using Spark 1.3.x ? That explains. This issue has been fixed in
>> Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
>> stats. :)
>>
>> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith  wrote:
>>
>>> Hi,
>>>
>>> I just switched from "createStream" to the "createDirectStream" API for
>>> kafka and while things otherwise seem happy, the first thing I noticed is
>>> that stream/receiver stats are gone from the Spark UI :( Those stats were
>>> very handy for keeping an eye on health of the app.
>>>
>>> What's the best way to re-create those in the Spark UI? Maintain
>>> Accumulators? Would be really nice to get back receiver-like stats even
>>> though I understand that "createDirectStream" is a receiver-less design.
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>>
>>>
>>
>


Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
Essentially, I went from:
k = createStream .
val dataout = k.map(x=>myFunc(x._2,someParams))
dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
myOutputFunc.write(rec) })

To:
kIn = createDirectStream .
k = kIn.repartition(numberOfExecutors) //since #kafka partitions <
#spark-executors
val dataout = k.map(x=>myFunc(x._2,someParams))
dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
myOutputFunc.write(rec) })

With the new API, the app starts up and works fine for a while but I guess
starts to deteriorate after a while. With the existing API "createStream",
the app does deteriorate but over a much longer period, hours vs days.






On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das  wrote:

> Yes, please tell us what operation are you using.
>
> TD
>
> On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger 
> wrote:
>
>> Is there any more info you can provide / relevant code?
>>
>> On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith  wrote:
>>
>>> Update on performance of the new API: the new code using the
>>> createDirectStream API ran overnight and when I checked the app state in
>>> the morning, there were massive scheduling delays :(
>>>
>>> Not sure why and haven't investigated a whole lot. For now, switched
>>> back to the createStream API build of my app. Yes, for the record, this is
>>> with CDH 5.4.1 and Spark 1.3.
>>>
>>>
>>>
>>> On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith  wrote:
>>>
>>>> Thanks for the super-fast response, TD :)
>>>>
>>>> I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
>>>> Cloudera, are you listening? :D
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Are you using Spark 1.3.x ? That explains. This issue has been fixed
>>>>> in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
>>>>> stats. :)
>>>>>
>>>>> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith  wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I just switched from "createStream" to the "createDirectStream" API
>>>>>> for kafka and while things otherwise seem happy, the first thing I 
>>>>>> noticed
>>>>>> is that stream/receiver stats are gone from the Spark UI :( Those stats
>>>>>> were very handy for keeping an eye on health of the app.
>>>>>>
>>>>>> What's the best way to re-create those in the Spark UI? Maintain
>>>>>> Accumulators? Would be really nice to get back receiver-like stats even
>>>>>> though I understand that "createDirectStream" is a receiver-less design.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das  wrote:

> Also, can you find from the spark UI the break up of the stages in each
> batch's jobs, and find which stage is taking more time after a while?
>

Sure, will try to debug/troubleshoot. Are there enhancements to this
specific API between 1.3 and 1.4 that can substantially change it's
behaviour?


> On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger 
> wrote:
>
>> when you say your old version was
>>
>> k = createStream .
>>
>> were you manually creating multiple receivers?  Because otherwise you're
>> only using one receiver on one executor...
>>
>
Yes, sorry, the earlier/stable version was more like:
kInStreams = (1 to n).map{_ => KafkaUtils.createStream  // n
being the number of kafka partitions, 1 receiver per partition
val k = ssc.union(kInStreams)
val dataout = k.map(x=>myFunc(x._2,someParams))
dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
myOutputFunc.write(rec) })

Thanks,

Tim





>
>> If that's the case I'd try direct stream without the repartitioning.
>>
>>
>> On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith  wrote:
>>
>>> Essentially, I went from:
>>> k = createStream .
>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>> myOutputFunc.write(rec) })
>>>
>>> To:
>>> kIn = createDirectStream .
>>> k = kIn.repartition(numberOfExecutors) //since #kafka partitions <
>>> #spark-executors
>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>> myOutputFunc.write(rec) })
>>>
>>> With the new API, the app starts up and works fine for a while but I
>>> guess starts to deteriorate after a while. With the existing API
>>> "createStream", the app does deteriorate but over a much longer period,
>>> hours vs days.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das 
>>> wrote:
>>>
>>>> Yes, please tell us what operation are you using.
>>>>
>>>> TD
>>>>
>>>> On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger 
>>>> wrote:
>>>>
>>>>> Is there any more info you can provide / relevant code?
>>>>>
>>>>> On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith  wrote:
>>>>>
>>>>>> Update on performance of the new API: the new code using the
>>>>>> createDirectStream API ran overnight and when I checked the app state in
>>>>>> the morning, there were massive scheduling delays :(
>>>>>>
>>>>>> Not sure why and haven't investigated a whole lot. For now, switched
>>>>>> back to the createStream API build of my app. Yes, for the record, this 
>>>>>> is
>>>>>> with CDH 5.4.1 and Spark 1.3.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith  wrote:
>>>>>>
>>>>>>> Thanks for the super-fast response, TD :)
>>>>>>>
>>>>>>> I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
>>>>>>> Cloudera, are you listening? :D
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das <
>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Are you using Spark 1.3.x ? That explains. This issue has been
>>>>>>>> fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more
>>>>>>>> awesome stats. :)
>>>>>>>>
>>>>>>>> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I just switched from "createStream" to the "createDirectStream"
>>>>>>>>> API for kafka and while things otherwise seem happy, the first thing I
>>>>>>>>> noticed is that stream/receiver stats are gone from the Spark UI :( 
>>>>>>>>> Those
>>>>>>>>> stats were very handy for keeping an eye on health of the app.
>>>>>>>>>
>>>>>>>>> What's the best way to re-create those in the Spark UI? Maintain
>>>>>>>>> Accumulators? Would be really nice to get back receiver-like stats 
>>>>>>>>> even
>>>>>>>>> though I understand that "createDirectStream" is a receiver-less 
>>>>>>>>> design.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Tim
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
I did try without repartition, initially, but that was even more horrible
because instead of the allocated 100 executors, only 30 (which is the
number of kafka partitions) would have to do the work. The "MyFunc" is a
CPU bound task so adding more memory per executor wouldn't help and I saw
that each of the 30 executors was only using one thread/core on each Spark
box. I could go and play with threading in MyFunc but I don't want to mess
with threading with all the parallelism already involved and I don't think
in-app threading outside of what the framework does is really desirable.

With repartition, there is shuffle involved, but at least the computation
load spreads across all 100 executors instead of just 30.




On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger  wrote:

> If that's the case, you're still only using as many read executors as
> there are kafka partitions.
>
> I'd remove the repartition. If you weren't doing any shuffles in the old
> job, and are doing a shuffle in the new job, it's not really comparable.
>
> On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith  wrote:
>
>> On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das 
>> wrote:
>>
>>> Also, can you find from the spark UI the break up of the stages in each
>>> batch's jobs, and find which stage is taking more time after a while?
>>>
>>
>> Sure, will try to debug/troubleshoot. Are there enhancements to this
>> specific API between 1.3 and 1.4 that can substantially change it's
>> behaviour?
>>
>>
>>> On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger 
>>> wrote:
>>>
>>>> when you say your old version was
>>>>
>>>> k = createStream .
>>>>
>>>> were you manually creating multiple receivers?  Because otherwise
>>>> you're only using one receiver on one executor...
>>>>
>>>
>> Yes, sorry, the earlier/stable version was more like:
>> kInStreams = (1 to n).map{_ => KafkaUtils.createStream  // n
>> being the number of kafka partitions, 1 receiver per partition
>> val k = ssc.union(kInStreams)
>> val dataout = k.map(x=>myFunc(x._2,someParams))
>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>> myOutputFunc.write(rec) })
>>
>> Thanks,
>>
>> Tim
>>
>>
>>
>>
>>
>>>
>>>> If that's the case I'd try direct stream without the repartitioning.
>>>>
>>>>
>>>> On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith  wrote:
>>>>
>>>>> Essentially, I went from:
>>>>> k = createStream .
>>>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>>>> myOutputFunc.write(rec) })
>>>>>
>>>>> To:
>>>>> kIn = createDirectStream .
>>>>> k = kIn.repartition(numberOfExecutors) //since #kafka partitions <
>>>>> #spark-executors
>>>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>>>> myOutputFunc.write(rec) })
>>>>>
>>>>> With the new API, the app starts up and works fine for a while but I
>>>>> guess starts to deteriorate after a while. With the existing API
>>>>> "createStream", the app does deteriorate but over a much longer period,
>>>>> hours vs days.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das 
>>>>> wrote:
>>>>>
>>>>>> Yes, please tell us what operation are you using.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>> On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger 
>>>>>> wrote:
>>>>>>
>>>>>>> Is there any more info you can provide / relevant code?
>>>>>>>
>>>>>>> On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Update on performance of the new API: the new code using the
>>>>>>>> createDirectStream API ran overnight and when I checked the app state 
>>>>>>>> in
>>>>>>>> the morning, there were massive scheduling delays :(
>>>>>>>>
&g

Re: Spark REST Job server feedback?

2015-10-08 Thread Tim Smith
I am curious too - any comparison between the two. Looks like one is
Datastax sponsored and the other is Cloudera. Other than that, any
major/core differences in design/approach?

Thanks,

Tim


On Mon, Sep 28, 2015 at 8:32 AM, Ramirez Quetzal  wrote:

> Anyone has feedback on using Hue / Spark Job Server REST servers?
>
>
> http://gethue.com/how-to-use-the-livy-spark-rest-job-server-for-interactive-spark/
>
> https://github.com/spark-jobserver/spark-jobserver
>
> Many thanks,
>
> Rami
>



-- 

--
Thanks,

Tim


Controlling output fileSize in SparkSQL

2015-07-27 Thread Tim Smith
Hi,

I am using Spark 1.3 (CDH 5.4.4). What's the recipe for setting a minimum
output file size when writing out from SparkSQL? So far, I have tried:
--x-
import sqlContext.implicits._
sc.hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache",true)
sc.hadoopConfiguration.setLong("fs.local.block.size",1073741824)
sc.hadoopConfiguration.setLong("dfs.blocksize",1073741824)
sqlContext.sql("SET spark.sql.shuffle.partitions=2")
val df = sqlContext.jsonFile("hdfs://nameservice1/user/joe/samplejson/*")
df.saveAsParquetFile("hdfs://nameservice1/user/joe/data/reduceFiles-Parquet")
--x-

But my output still isn't aggregated into 1+GB files.

Thanks,

- Siddhartha


Alter table fails to find table

2015-09-02 Thread Tim Smith
Spark 1.3.0 (CDH 5.4.4)

scala> sqlContext.sql("SHOW TABLES").collect
res18: Array[org.apache.spark.sql.Row] = Array([allactivitydata,true],
[sample_07,false], [sample_08,false])

sqlContext.sql("SELECT COUNT(*) from allactivitydata").collect
res19: Array[org.apache.spark.sql.Row] = Array([1227230])

scala> sqlContext.sql("ALTER TABLE allactivitydata ADD COLUMNS (test
STRING)");
15/09/03 04:23:16 INFO ParseDriver: Parsing command: ALTER TABLE
allactivitydata ADD COLUMNS (test STRING)
15/09/03 04:23:16 INFO ParseDriver: Parse Completed
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO ParseDriver: Parsing command: ALTER TABLE
allactivitydata ADD COLUMNS (test STRING)
15/09/03 04:23:16 INFO ParseDriver: Parse Completed
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 ERROR Driver: FAILED: SemanticException [Error 10001]:
Table not found default.allactivitydata
org.apache.hadoop.hive.ql.parse.SemanticException: Table not found
default.allactivitydata
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getTable(BaseSemanticAnalyzer.java:1332)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getTable(BaseSemanticAnalyzer.java:1315)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addInputsOutputsAlterTable(DDLSemanticAnalyzer.java:1387)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addInputsOutputsAlterTable(DDLSemanticAnalyzer.java:1374)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeAlterTableModifyCols(DDLSemanticAnalyzer.java:2611)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeInternal(DDLSemanticAnalyzer.java:255)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:222)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:423)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:307)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1112)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1160)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1039)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:308)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280)
at
org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55)
at
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:147)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:130)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
at $line83.$read$$iwC$$iwC$$iwC$$iwC.(:59)
at $line83.$read$$iwC$$iwC$$iwC.(:61)
at $line83.$read$$iwC$$iwC.(:63)
at $line83.$read$$iwC.(:65)
at $line83.$read.(:67)
at $line83.$read$.(:71)
at $line83.$read$.()
at $line83.$eval$.(:7)
at $line83.$eval$.()
at $line83.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at
org.apache.spark.repl.Sp

Consuming AWS Cloudwatch logs from Kinesis into Spark

2017-04-05 Thread Tim Smith
I am sharing this code snippet since I spent quite some time figuring it
out and I couldn't find any examples online. Between the Kinesis
documentation, tutorial on AWS site and other code snippets on the
Internet, I was confused about structure/format of the messages that Spark
fetches from Kinesis - base64 encoded, json, gzipped - which one first and
what order.

I tested this on EMR-5.4.0, Amazon Hadoop 2.7.3 and Spark 2.1.0. Hope it
helps others googling for similar info. I tried using Structured Streaming
but (1) it's in Alpha and (2) despite including what I thought were all the
dependencies, it complained of not finding DataSource.Kinesis. You probably
do not need all the libs but I am just too lazy to redact ones you don't
require for the snippet below :)

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import java.util.Base64
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode
import org.apache.commons.math3.stat.descriptive._
import java.io.File
import java.net.InetAddress
import scala.util.control.NonFatal
import org.apache.spark.SparkFiles
import org.apache.spark.sql.SaveMode
import java.util.Properties;
import org.json4s._
import org.json4s.jackson.JsonMethods._
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
import scala.util.Try


//sc.setLogLevel("INFO")

val ssc = new StreamingContext(sc, Seconds(30))

val kinesisStreams = (0 until 2).map { i => KinesisUtils.createStream(ssc,
"myApp", "cloudwatchlogs",
"https://kinesis.us-east-1.amazonaws.com","us-east-1";,
InitialPositionInStream.LATEST , Seconds(30),
StorageLevel.MEMORY_AND_DISK_2,"myId","mySecret") }

val unionStreams = ssc.union(kinesisStreams)

unionStreams.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
  if(rdd.count() > 0) {
  val json = rdd.map(input => {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(input))
  val record = scala.io.Source.fromInputStream(inputStream).mkString
  compact(render(parse(record)))
  })

  val df = spark.sqlContext.read.json(json)
  val preDF =
df.select($"logGroup",explode($"logEvents").as("events_flat"))
  val penDF = preDF.select($"logGroup",$"events_flat.extractedFields")
  val finalDF =
penDF.select($"logGroup".as("cluster"),$"extractedFields.*")
  finalDF.printSchema()
  finalDF.show()
 }
})

ssc.start



--
Thanks,

Tim


Re: Assigning a unique row ID

2017-04-07 Thread Tim Smith
http://stackoverflow.com/questions/37231616/add-a-new-column-to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator


On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson 
wrote:

> Hi,
>
> What's the best way to assign a truly unique row ID (rather than a hash)
> to a DataFrame/Dataset?
>
> I originally thought that functions.monotonically_increasing_id would do
> this, but it seems to have a rather unfortunate property that if you add it
> as a column to table A and then derive tables X, Y, Z and save those, the
> row ID values in X, Y, and Z may end up different. I assume this is because
> it delays the actual computation to the point where each of those tables is
> computed.
>
>


-- 

--
Thanks,

Tim


Initialize Gaussian Mixture Model using Spark ML dataframe API

2017-04-27 Thread Tim Smith
Hi,

I am trying to figure out the API to initialize a gaussian mixture model
using either centroids created by K-means or previously calculated GMM
model (I am aware that you can "save" a model and "load" in later but I am
not interested in saving a model to a filesystem).

The Spark MLlib API lets you do this using SetInitialModel
https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixture

However, I cannot figure out how to do this using Spark ML API. Can anyone
please point me in the right direction? I've tried reading the Spark ML
code and was wondering if the "set" call lets you do that?

--
Thanks,

Tim


Re: Initialize Gaussian Mixture Model using Spark ML dataframe API

2017-05-02 Thread Tim Smith
Yes, I noticed these open issues, both with KMeans and GMM:
https://issues.apache.org/jira/browse/SPARK-13025

Thanks,

Tim


On Mon, May 1, 2017 at 9:01 PM, Yanbo Liang  wrote:

> Hi Tim,
>
> Spark ML API doesn't support set initial model for GMM currently. I wish
> we can get this feature in Spark 2.3.
>
> Thanks
> Yanbo
>
> On Fri, Apr 28, 2017 at 1:46 AM, Tim Smith  wrote:
>
>> Hi,
>>
>> I am trying to figure out the API to initialize a gaussian mixture model
>> using either centroids created by K-means or previously calculated GMM
>> model (I am aware that you can "save" a model and "load" in later but I am
>> not interested in saving a model to a filesystem).
>>
>> The Spark MLlib API lets you do this using SetInitialModel
>> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org
>> .apache.spark.mllib.clustering.GaussianMixture
>>
>> However, I cannot figure out how to do this using Spark ML API. Can
>> anyone please point me in the right direction? I've tried reading the Spark
>> ML code and was wondering if the "set" call lets you do that?
>>
>> --
>> Thanks,
>>
>> Tim
>>
>
>


-- 

--
Thanks,

Tim


Re: [Spark Streaming] Dynamic Broadcast Variable Update

2017-05-02 Thread Tim Smith
One, I think, you should take this to the spark developer list.

Two, I suspect broadcast variables aren't the best solution for the use
case, you describe. Maybe an in-memory data/object/file store like tachyon
is a better fit.

Thanks,

Tim


On Tue, May 2, 2017 at 11:56 AM, Nipun Arora 
wrote:

> Hi All,
>
> To support our Spark Streaming based anomaly detection tool, we have made
> a patch in Spark 1.6.2 to dynamically update broadcast variables.
>
> I'll first explain our use-case, which I believe should be common to
> several people using Spark Streaming applications. Broadcast variables are
> often used to store values "machine learning models", which can then be
> used on streaming data to "test" and get the desired results (for our case
> anomalies). Unfortunately, in the current spark, broadcast variables are
> final and can only be initialized once before the initialization of the
> streaming context. Hence, if a new model is learned the streaming system
> cannot be updated without shutting down the application, broadcasting
> again, and restarting the application. Our goal was to re-broadcast
> variables without requiring a downtime of the streaming service.
>
> The key to this implementation is a live re-broadcastVariable() interface,
> which can be triggered in between micro-batch executions, without any
> re-boot required for the streaming application. At a high level the task is
> done by re-fetching broadcast variable information from the spark driver,
> and then re-distribute it to the workers. The micro-batch execution is
> blocked while the update is made, by taking a lock on the execution. We
> have already tested this in our prototype deployment of our anomaly
> detection service and can successfully re-broadcast the broadcast variables
> with no downtime.
>
> We would like to integrate these changes in spark, can anyone please let
> me know the process of submitting patches/ new features to spark. Also. I
> understand that the current version of Spark is 2.1. However, our changes
> have been done and tested on Spark 1.6.2, will this be a problem?
>
> Thanks
> Nipun
>



-- 

--
Thanks,

Tim


Streaming scheduling delay

2015-02-11 Thread Tim Smith
On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
streaming app that consumes data from Kafka and writes it back to Kafka
(different topic). My big problem has been Total Delay. While execution
time is usually https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Could this be an issue with the driver being a bottlneck? All the executors
posting their logs/stats to the driver?

Thanks,

Tim


Re: Streaming scheduling delay

2015-02-11 Thread Tim Smith
Just read the thread "Are these numbers abnormal for spark streaming?" and
I think I am seeing similar results - that is - increasing the window seems
to be the trick here. I will have to monitor for a few hours/days before I
can conclude (there are so many knobs/dials).



On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith  wrote:

> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
> streaming app that consumes data from Kafka and writes it back to Kafka
> (different topic). My big problem has been Total Delay. While execution
> time is usually  minutes to hours(s) (keeps going up).
>
> For a little while, I thought I had solved the issue by bumping up the
> driver memory. Then I expanded my Kafka cluster to add more nodes and the
> issue came up again. I tried a few things to smoke out the issue and
> something tells me the driver is the bottleneck again:
>
> 1) From my app, I took out the entire write-out-to-kafka piece. Sure
> enough, execution, scheduling delay and hence total delay fell to sub
> second. This assured me that whatever processing I do before writing back
> to kafka isn't the bottleneck.
>
> 2) In my app, I had RDD persistence set at different points but my code
> wasn't really re-using any RDDs so I took out all explicit persist()
> statements. And added, "spar...unpersist" to "true" in the context. After
> this, it doesn't seem to matter how much memory I give my executor, the
> total delay seems to be in the same range. I tried per executor memory from
> 2G to 12G with no change in total delay so executors aren't memory starved.
> Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
> used when per executor memory is set to 2GB, for example.
>
> 3) Input rate in the kafka consumer restricts spikes in incoming data.
>
> 4) Tried FIFO and FAIR but didn't make any difference.
>
> 5) Adding executors beyond a certain points seems useless (I guess excess
> ones just sit idle).
>
> At any given point in time, the SparkUI shows only one batch pending
> processing. So with just one batch pending processing, why would the
> scheduling delay run into minutes/hours if execution time is within the
> batch window duration? There aren't any failed stages or jobs.
>
> Right now, I have 100 executors ( i have tried setting executors from
> 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
> are 5 kafka receivers and each incoming stream is split into 40 partitions.
> Per receiver, input rate is restricted to 2 messages per second.
>
> Can anyone help me with clues or areas to look into, for troubleshooting
> the issue?
>
> One nugget I found buried in the code says:
> "The scheduler delay includes the network delay to send the task to the
> worker machine and to send back the result (but not the time to fetch the
> task result, if it needed to be fetched from the block manager on the
> worker)."
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
>
> Could this be an issue with the driver being a bottlneck? All the
> executors posting their logs/stats to the driver?
>
> Thanks,
>
> Tim
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Gerard,

Great write-up and really good guidance in there.

I have to be honest, I don't know why but setting # of partitions for each
dStream to a low number (5-10) just causes the app to choke/crash. Setting
it to 20 gets the app going but with not so great delays. Bump it up to 30
and I start winning the war where processing time is consistently below
batch time window (20 seconds) except for a batch every few batches where
the compute time spikes 10x the usual.

Following your guide, I took out some "logInfo" statements I had in the app
but didn't seem to make much difference :(

With a higher time window (20 seconds), I got the app to run stably for a
few hours but then ran into the dreaded "java.lang.Exception: Could not
compute split, block input-0-1423761240800 not found". Wonder if I need to
add RDD persistence back?

Also, I am reaching out to Virdata with some ProServ inquiries.

Thanks





On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas  wrote:

> Hi Tim,
>
> From this: " There are 5 kafka receivers and each incoming stream is
> split into 40 partitions"  I suspect that you're creating too many tasks
> for Spark to process on time.
> Could you try some of the 'knobs' I describe here to see if that would
> help?
>
> http://www.virdata.com/tuning-spark/
>
> -kr, Gerard.
>
> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith  wrote:
>
>> Just read the thread "Are these numbers abnormal for spark streaming?"
>> and I think I am seeing similar results - that is - increasing the window
>> seems to be the trick here. I will have to monitor for a few hours/days
>> before I can conclude (there are so many knobs/dials).
>>
>>
>>
>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith  wrote:
>>
>>> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
>>> streaming app that consumes data from Kafka and writes it back to Kafka
>>> (different topic). My big problem has been Total Delay. While execution
>>> time is usually >> minutes to hours(s) (keeps going up).
>>>
>>> For a little while, I thought I had solved the issue by bumping up the
>>> driver memory. Then I expanded my Kafka cluster to add more nodes and the
>>> issue came up again. I tried a few things to smoke out the issue and
>>> something tells me the driver is the bottleneck again:
>>>
>>> 1) From my app, I took out the entire write-out-to-kafka piece. Sure
>>> enough, execution, scheduling delay and hence total delay fell to sub
>>> second. This assured me that whatever processing I do before writing back
>>> to kafka isn't the bottleneck.
>>>
>>> 2) In my app, I had RDD persistence set at different points but my code
>>> wasn't really re-using any RDDs so I took out all explicit persist()
>>> statements. And added, "spar...unpersist" to "true" in the context. After
>>> this, it doesn't seem to matter how much memory I give my executor, the
>>> total delay seems to be in the same range. I tried per executor memory from
>>> 2G to 12G with no change in total delay so executors aren't memory starved.
>>> Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
>>> used when per executor memory is set to 2GB, for example.
>>>
>>> 3) Input rate in the kafka consumer restricts spikes in incoming data.
>>>
>>> 4) Tried FIFO and FAIR but didn't make any difference.
>>>
>>> 5) Adding executors beyond a certain points seems useless (I guess
>>> excess ones just sit idle).
>>>
>>> At any given point in time, the SparkUI shows only one batch pending
>>> processing. So with just one batch pending processing, why would the
>>> scheduling delay run into minutes/hours if execution time is within the
>>> batch window duration? There aren't any failed stages or jobs.
>>>
>>> Right now, I have 100 executors ( i have tried setting executors from
>>> 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
>>> are 5 kafka receivers and each incoming stream is split into 40 partitions.
>>> Per receiver, input rate is restricted to 2 messages per second.
>>>
>>> Can anyone help me with clues or areas to look into, for troubleshooting
>>> the issue?
>>>
>>> One nugget I found buried in the code says:
>>> "The scheduler delay includes the network delay to send the task to the
>>> worker machine and to send back the result (but not the time to fetch the
>>> task result, if it needed to be fetched from the block manager on the
>>> worker)."
>>>
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
>>>
>>> Could this be an issue with the driver being a bottlneck? All the
>>> executors posting their logs/stats to the driver?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
ava.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 55]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage 16291.0
(TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 56]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0
(TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 57]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0
(TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 58]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0
(TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 59]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0
(TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 60]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0
(TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 61]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0
(TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 62]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0
(TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 63]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 54
in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in
stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception:
Could not compute split, block input-4-1423758372200 not found
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent
failure: Lost task 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com):
java.lang.Exception: Could not compute split, block input-4-1423758372200
not found


Thanks for looking into it.





On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das  wrote:

> Hey Tim,
>
> Let me get the key points.
> 1. If you are not writing back to Kafka, the delay is stable? That is,
> instead of "foreachRDD { // write to kafka }"  if you do "dstream.count",
> then the delay is stable. Right?
> 2. If so, then Kafka is the bottleneck. Is the number of partitions, that
> you spoke of the in the second mail, that determines the parallelism in
> writes? Is it stable with 30 partitions?
>
> Regarding the block exception, could you give me a trace of info level
> logging that leads to this error? Basically I want trace the lifecycle of
> the block.
>
> TD
>
>
>
> On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith  wrote:
>
>> Hi Gerard,
>>
>> Great write-up and really good guidance in there.
>>
>> I have to be honest, I don't know why but setting # of partitions for
>> each dStream to a low number (5-10) just causes the app to choke/crash.
>> Setting it to 20 gets the app going but with not so great delays. Bump it
>> up to 30 and I start winning the war where processing time is consistently
>> below batch time window (20 seconds) except for a batch every few batches
>> where the compute time spikes 10x the usual.
>>
>> Following your guide, I took out some "logInfo" statements I had in the
>> app but didn't seem to make much difference :(
>>
>> With a higher time window (20 seconds), I got the app to run stably for a
>> few hours but then ran into the dreaded "java.lang.Exception: Could not
>> compute split, block input-0-1423761240800 not found". Wonder if I need to
>> add RDD persistence back?
>>
>> Also, I am reaching out to Virdata with some ProServ inquiries.
>>
>> Thanks
>>
>>
>>
>>
>>
>> On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas 
>> wrote:
>>
>>> Hi Tim,
>>>
>>> From this: " There are 5 kafka receivers and each incoming stream is
>>> split into 40 partitions"  I suspect that you're creating too many
>>> tasks for Spark to process on time.
>>> Could you try some of the 'knobs' I describe here to see if that would
>>> help?
>>>
>>> http://www.virdata.com/tuning-spark/
>>>

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
I replaced the writeToKafka statements with a rdd.count() and sure enough,
I have a stable app with total delay well within the batch window (20
seconds). Here's the total delay lines from the driver log:
15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time
142380806 ms (execution: 6.404 s)
15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time
142380808 ms (execution: 42.338 s)
15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time
142380810 ms (execution: 59.483 s)
15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time
142380812 ms (execution: 18.363 s)
15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time
142380814 ms (execution: 10.100 s)
15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time
142380816 ms (execution: 6.209 s)
15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time
142380818 ms (execution: 9.854 s)
15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time
142380820 ms (execution: 7.038 s)
15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time
142380822 ms (execution: 8.039 s)
15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time
142380824 ms (execution: 5.213 s)
15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time
142380826 ms (execution: 5.767 s)
15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time
142380828 ms (execution: 6.858 s)
15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time
142380830 ms (execution: 8.556 s)
15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time
142380832 ms (execution: 5.583 s)
15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time
142380834 ms (execution: 4.791 s)
15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time
142380836 ms (execution: 4.422 s)
15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time
142380838 ms (execution: 5.733 s)
15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time
142380840 ms (execution: 4.701 s)
15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time
142380842 ms (execution: 4.782 s)
15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time
142380844 ms (execution: 4.678 s)
15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time
142380846 ms (execution: 4.064 s)
15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time
142380848 ms (execution: 4.514 s)
15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time
142380850 ms (execution: 3.954 s)
15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time
142380852 ms (execution: 4.309 s)
15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time
142380854 ms (execution: 4.667 s)
15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time
142380856 ms (execution: 4.681 s)
15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time
142380858 ms (execution: 7.816 s)
15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time
142380860 ms (execution: 8.383 s)
15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time
142380862 ms (execution: 3.814 s)
15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time
142380864 ms (execution: 3.892 s)
15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time
142380866 ms (execution: 3.767 s)
15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time
142380868 ms (execution: 3.845 s)
15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time
142380870 ms (execution: 3.510 s)
15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time
142380872 ms (execution: 6.989 s)
15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time
142380874 ms (execution: 3.594 s)
15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time
142380876 ms (execution: 3.383 s)
15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time
142380878 ms (execution: 3.897 s)
15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time
142380880 ms (execution: 3.596 s)
15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time
142380882 ms (execution: 3.861 s)
15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time
142380884 ms (execution: 4.026 s)




On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith  wrote:

> TD - I will try count() and report back. Meanwhile, attached is the entire
> driver log that includes the error logs about missing blocks.
>
> Cody - Let me research a bit about how to do connection pooling. Sorry, I
> am not really a programmer. I did see the connection pooling advise in the
> Spark Streaming Programming guide as an optimization but wasn't sure how to
> implement it. But do you think it will have a significant impact on
> performance?
>
> Saisai - I think

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Saisai,

If I understand correctly, you are suggesting that control parallelism by
having number of consumers/executors at least 1:1 for number of kafka
partitions. For example, if I have 50 partitions for a kafka topic then
either have:
- 25 or more executors, 25 receivers, each receiver set to 2 consumer
threads per topic, or,
- 50 or more executors, 50 receivers, each receiver set to 1 consumer
thread per topic

Actually, both executors and total consumers can be more than the number of
kafka partitions (some will probably sit idle).

But do away with dStream partitioning altogether.

Right?

Thanks,

- Tim




On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao 
wrote:

> Hi Tim,
>
> I think maybe you can try this way:
>
> create Receiver per executor and specify thread for each topic large than
> 1, and the total number of consumer thread will be: total consumer =
> (receiver number) * (thread number), and make sure this total consumer is
> less than or equal to Kafka partition number. In this case, I think the
> parallelism is enough, received blocks are distributed to each executor. So
> you don't need to repartition to increase the parallelism.
>
> Besides for Kafka's high-level API, Kafka partitions may not be equally
> distributed to all the receivers, so some tasks may process more data than
> other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
> that will be more balanced because each Kafka partition mapping to Spark
> partition.
>
>
> Besides "set partition count to 1 for each dStream" means
> dstream.repartition(1) ? If so I think it will still introduce shuffle and
> move all the data into one partition.
>
> Thanks
> Saisai
>
> 2015-02-13 13:54 GMT+08:00 Tim Smith :
>
>> TD - I will try count() and report back. Meanwhile, attached is the
>> entire driver log that includes the error logs about missing blocks.
>>
>> Cody - Let me research a bit about how to do connection pooling. Sorry, I
>> am not really a programmer. I did see the connection pooling advise in the
>> Spark Streaming Programming guide as an optimization but wasn't sure how to
>> implement it. But do you think it will have a significant impact on
>> performance?
>>
>> Saisai - I think, ideally, I'd rather not do any dStream partitioning.
>> Instead have 1 receiver for each kafka partition (so in this case 23
>> receivers for 23 kafka partitions) and then have as many or more executors
>> to handle processing of the dStreams. Right? Trouble is, I tried this
>> approach and didn't work. Even If I set 23 receivers, and set partition
>> count to 1 for each dStream (effectively, no stream splitting), my
>> performance is extremely poor/laggy. Should I modify my code to remove
>> dStream partitioning altogether and then try setting as many receivers as
>> kafka partitions?
>>
>>
>>
>>
>>
>> On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao 
>> wrote:
>>
>>> Hi Tim,
>>>
>>> I think this code will still introduce shuffle even when you call
>>> repartition on each input stream. Actually this style of implementation
>>> will generate more jobs (job per each input stream) than union into one
>>> stream as called DStream.union(), and union normally has no special
>>> overhead as I understood.
>>>
>>> Also as Cody said, creating Producer per partition could be a potential
>>> overhead, producer pool or sharing the Producer for one executor might be
>>> better :).
>>>
>>>
>>>  // Process stream from each receiver separately
>>>  // (do not attempt to merge all streams and then re-partition,
>>> this causes un-necessary and high amount of shuffle in the job)
>>>  for (k <- kInStreams)
>>> {
>>>  // Re-partition stream from each receiver across all
>>> compute nodes to spread out processing load and allows per partition
>>> processing
>>>  // and, set persistence level to spill to disk along
>>> with serialization
>>>  val kInMsgParts =
>>> k.repartition(otherConf("dStreamPartitions").toInt).
>>>
>>> 2015-02-13 13:27 GMT+08:00 Cody Koeninger :
>>>
>>>> outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>>>>
>>>>  val writer = new
>>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>>>
>>>>  writer.output(rec)
>>>>
>>>> }) )
>>>>
>>>>
>>

Re: Spark Streaming output cannot be used as input?

2015-02-18 Thread Tim Smith
+1 for writing the Spark output to Kafka. You can then hang off multiple
compute/storage framework from kafka. I am using a similar pipeline to feed
ElasticSearch and HDFS in parallel. Allows modularity, you can take down
ElasticSearch or HDFS for maintenance without losing (except for some edge
cases) data.

You can even pipeline other Spark streaming apps off kafka to modularize
your processing pipeline so you don't have one single big Spark app doing
all the processing.



On Wed, Feb 18, 2015 at 3:34 PM, Jose Fernandez  wrote:

>  Thanks for the advice folks, it is much appreciated. This seems like a
> pretty unfortunate design flaw. My team was surprised by it.
>
>
>
> I’m going to drop the two-step process and do it all in a single step
> until we get Kafka online.
>
>
>
> *From:* Sean Owen [mailto:so...@cloudera.com]
> *Sent:* Wednesday, February 18, 2015 1:53 AM
> *To:* Emre Sevinc
> *Cc:* Jose Fernandez; user@spark.apache.org
> *Subject:* Re: Spark Streaming output cannot be used as input?
>
>
>
> To clarify, sometimes in the world of Hadoop people freely refer to an
> output 'file' when it's really a directory containing 'part-*' files which
> are pieces of the file. It's imprecise but that's the meaning. I think the
> scaladoc may be referring to 'the path to the file, which includes this
> parent dir, is generated ...' In an inherently distributed system, you want
> to distributed writes and reads, so big "files" are really made of logical
> files within a directory.
>
>
>
> There is a JIRA open to support nested dirs which has been languishing:
> https://issues.apache.org/jira/browse/SPARK-3586
>
> I'm hoping to pursue that again with help from tdas after 1.3.
>
> That's probably the best solution.
>
>
>
> An alternative is to not use the file system as a sort of message queue,
> and instead use something like Kafka. It has a lot of other benefits but
> maybe it's not feasible to add this to your architecture.
>
>
>
> You can merge the files with HDFS APIs without much trouble. The dirs will
> be named consistently according to time and are something you can also
> query for.
>
>
>
> Making 1 partition has implications for parallelism of your job.
>
>
>
> Emre, I think I see what you're getting at but you have the map +
> materialize pattern which i think doesn't have the right guarantees about
> re-execution. Why not foreachRDD?
>
>
>
> Yes you can also consider collecting the whole RDD in foreachRDD and doing
> what you like, including writing to one file. But that would only work if
> the data is always small in each RDD.
>
>
>
>
>   
>
>
>   SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
> On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc 
> wrote:
>
> Hello Jose,
>
> We've hit the same issue a couple of months ago. It is possible to write
> directly to files instead of creating directories, but it is not
> straightforward, and I haven't seen any clear demonstration in books,
> tutorials, etc.
>
> We do something like:
>
> SparkConf sparkConf = new SparkConf().setAppName(appName);
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
> Duration(batchInterval));
> JavaDStream stream = MyModuleApp.initializeJob(ssc);
> MyModuleApp.process(stream);
>
>
>
> And then in the process method:
>
> @Override public void process(JavaDStream inStream) {
>
>
>
> JavaDStream json = inStream.map(new 
> MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, 
> rejectedJSONoutputDir));
>
> forceOutput(json);
>
>   }
>
>  This, in turn, calls the following (I've removed the irrelevant lines to 
> focus on writing):
>
>
> public class MyModuleWorker implements Function {
>
>   public String call(String json) {
>
>
> // process the data and then write it
>
> writeJSON(json, validatedJSONoutputDir_);
>
>   }
>
>
>
> }
>
> And the writeJSON method is:
>
> public static final void writeJSON(String json, String jsonDirPath) throws 
> IOException {
>
> String jsonFileName = jsonDirPath + "/" + UUID.randomUUID().toString() + 
> ".json.tmp";
>
> URI uri = URI.create(jsonFileName);
>
> Configuration conf = new Configuration();
>
> FileSystem fileSystem = FileSystem.get(uri, conf);
>
> FSDataOutputStream out = fileSystem.create(new Path(uri));
>
> out.write(json.getBytes(StandardCharsets.UTF_8));
>
> out.close();
>
>
>
> fileSystem.rename(new Path(uri),
>
> new Path(URI.create(jsonDirPath + "/" + 
> UUID.randomUUID().toString() + ".json")));
>
>
>
>   }
>
>
>
> Using a similar technique you might be able to achieve your objective.
>

How to diagnose "could not compute split" errors and failed jobs?

2015-02-19 Thread Tim Smith
My streaming app runs fine for a few hours and then starts spewing "Could
not compute split, block input-xx-xxx not found" errors. After this,
jobs start to fail and batches start to pile up.

My question isn't so much about why this error but rather, how do I trace
what leads to this error? I am using disk+memory for storage so shouldn't
be a case of data loss resulting from memory overrun.

15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job
142429705 ms.28
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in
stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com):
java.lang.Exception: Could not compute split, block input-28-1424297042500
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,

Tim


Accumulator in SparkUI for streaming

2015-02-20 Thread Tim Smith
On Spark 1.2:

I am trying to capture # records read from a kafka topic:

val inRecords = ssc.sparkContext.accumulator(0, "InRecords")

..

kInStreams.foreach( k =>
{

 k.foreachRDD ( rdd =>  inRecords += rdd.count().toInt  )
 inRecords.value


Question is how do I get the accumulator to show up in the UI? I tried
"inRecords.value" but that didn't help. Pretty sure it isn't showing up in
Stage metrics.

What's the trick here? collect?

Thanks,

Tim


Re: Accumulator in SparkUI for streaming

2015-02-28 Thread Tim Smith
So somehow Spark Streaming doesn't support display of named accumulators in
the WebUI?


On Tue, Feb 24, 2015 at 7:58 AM, Petar Zecevic 
wrote:

>
> Interesting. Accumulators are shown on Web UI if you are using the
> ordinary SparkContext (Spark 1.2). It just has to be named (and that's what
> you did).
>
> scala> val acc = sc.accumulator(0, "test accumulator")
> acc: org.apache.spark.Accumulator[Int] = 0
> scala> val rdd = sc.parallelize(1 to 1000)
> rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at :12
> scala> rdd.foreach(x => acc += 1)
> scala> acc.value
> res1: Int = 1000
>
> The Stage details page shows:
>
>
>
>
> On 20.2.2015. 9:25, Tim Smith wrote:
>
>  On Spark 1.2:
>
>  I am trying to capture # records read from a kafka topic:
>
>  val inRecords = ssc.sparkContext.accumulator(0, "InRecords")
>
>  ..
>
>  kInStreams.foreach( k =>
> {
>
>   k.foreachRDD ( rdd =>  inRecords += rdd.count().toInt  )
>   inRecords.value
>
>
>  Question is how do I get the accumulator to show up in the UI? I tried
> "inRecords.value" but that didn't help. Pretty sure it isn't showing up in
> Stage metrics.
>
>  What's the trick here? collect?
>
>  Thanks,
>
>  Tim
>
>
>


Updating shared data structure between executors

2014-08-19 Thread Tim Smith
Hi,

I am writing some Scala code to normalize a stream of logs using an
input configuration file (multiple regex patterns). To avoid
re-starting the job, I can read in a new config file using fileStream
and then turn the config file to a map. But I am unsure about how to
update a shared map (since broadcast vars cannot be updated)?

Any help or pointers will be appreciated.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kafka stream receiver stops input

2014-08-27 Thread Tim Smith
Hi,

I have Spark (1.0.0 on CDH5) running with Kafka 0.8.1.1.

I have a streaming jobs that reads from a kafka topic and writes
output to another kafka topic. The job starts fine but after a while
the input stream stops getting any data. I think these messages show
no incoming data on the stream:
14/08/28 00:42:15 INFO ReceiverTracker: Stream 0 received 0 blocks

I run the job as:
spark-submit --class logStreamNormalizer --master yarn
log-stream-normalizer_2.10-1.0.jar --jars
spark-streaming-kafka_2.10-1.0.2.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
--executor-memory 6G --spark.cleaner.ttl 60 --executor-cores 4

As soon as I start the job, I see an error like:

14/08/28 00:50:59 INFO BlockManagerInfo: Added input-0-1409187056800
in memory on node6-acme.com:39418 (size: 83.3 MB, free: 3.1 GB)
Exception in thread "pool-1-thread-7" java.lang.OutOfMemoryError: Java
heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:85)
at org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176)
at org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63)
at 
org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109)
at 
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:42)
at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
at 
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662)
at 
org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

But not sure if that is the cause because even after that OOM message,
I see data coming in:
14/08/28 00:51:00 INFO ReceiverTracker: Stream 0 received 6 blocks

Appreciate any pointers or suggestions to troubleshoot the issue.

Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DStream repartitioning, performance tuning processing

2014-08-28 Thread Tim Smith
Hi,

In my streaming app, I receive from kafka where I have tried setting the
partitions when calling "createStream" or later, by calling repartition -
in both cases, the number of nodes running the tasks seems to be stubbornly
stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more
nodes.

I am starting the job as:
nohup spark-submit --class logStreamNormalizer --master yarn
log-stream-normalizer_2.10-1.0.jar --jars
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
--executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
--num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo $! >
run-6.pid

My main code is:
 val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
 val ssc = new StreamingContext(sparkConf,Seconds(5))
 val kInMsg =
KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct"
-> 16))

 val propsMap = Map("metadata.broker.list" ->
"node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092", "serializer.class" ->
"kafka.serializer.StringEncoder", "producer.type" -> "async",
"request.required.acks" -> "1")
 val to_topic = """normStruct"""
 val writer = new KafkaOutputService(to_topic, propsMap)


 if (!configMap.keySet.isEmpty)
 {
  //kInMsg.repartition(8)
  val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
  outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => {
writer.output(rec) }) } )
 }

 ssc.start()
 ssc.awaitTermination()

In terms of total delay, with a 5 second batch, the delays usually stay
under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning
question, does this mean, I can reduce my cleaner ttl from 60 to say 25
(still more than double of the peak delay)?

Thanks

Tim


Failed to run runJob at ReceiverTracker.scala

2014-08-28 Thread Tim Smith
Hi,

Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died with:

14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at
ReceiverTracker.scala:275
Exception in thread "Thread-59" 14/08/28 22:28:15 INFO
YarnClientClusterScheduler: Cancelling stage 2
14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4)
14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove executor 5
from BlockManagerMaster.
14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in
removeExecutor
org.apache.spark.SparkException: Job aborted due to stage failure: Task
2.0:0 failed 4 times, most recent failure: TID 6481 on host
node-dn1-1.ops.sfdc.net failed for unknown reason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Any insights into this error?

Thanks,

Tim


Re: Failed to run runJob at ReceiverTracker.scala

2014-08-28 Thread Tim Smith
Appeared after running for a while. I re-ran the job and this time, it
crashed with:
14/08/29 00:18:50 WARN ReceiverTracker: Error reported by receiver for
stream 0: Error in block pushing thread - java.net.SocketException: Too
many open files

Shouldn't the failed receiver get re-spawned on a different worker?



On Thu, Aug 28, 2014 at 4:12 PM, Tathagata Das 
wrote:

> Do you see this error right in the beginning or after running for sometime?
>
> The root cause seems to be that somehow your Spark executors got killed,
> which killed receivers and caused further errors. Please try to take a look
> at the executor logs of the lost executor to find what is the root cause
> that caused the executor to fail.
>
> TD
>
>
> On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith  wrote:
>
>> Hi,
>>
>> Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died with:
>>
>> 14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at
>> ReceiverTracker.scala:275
>> Exception in thread "Thread-59" 14/08/28 22:28:15 INFO
>> YarnClientClusterScheduler: Cancelling stage 2
>> 14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4)
>> 14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove executor
>> 5 from BlockManagerMaster.
>> 14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in
>> removeExecutor
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 2.0:0 failed 4 times, most recent failure: TID 6481 on host
>> node-dn1-1.ops.sfdc.net failed for unknown reason
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Any insights into this error?
>>
>> Thanks,
>>
>> Tim
>>
>>
>


Re: DStream repartitioning, performance tuning processing

2014-08-28 Thread Tim Smith
TD - Apologies, didn't realize I was replying to you instead of the list.

What does "numPartitions" refer to when calling createStream? I read an
earlier thread that seemed to suggest that numPartitions translates to
partitions created on the Spark side?
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E

Actually, I re-tried with 64 numPartitions in createStream and that didn't
work. I will manually set "repartition" to 64/128 and see how that goes.

Thanks.




On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das 
wrote:

> Having 16 partitions in KafkaUtils.createStream does not translate to the
> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
> best way to distribute the received data between all the nodes, as long as
> there are sufficient number of partitions (try setting it to 2x the number
> cores given to the application).
>
> Yeah, in 1.0.0, ttl should be unnecessary.
>
>
>
> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith  wrote:
>
>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> If you are repartitioning to 8 partitions, and your node happen to have
>>> at least 4 cores each, its possible that all 8 partitions are assigned to
>>> only 2 nodes. Try increasing the number of partitions. Also make sure you
>>> have executors (allocated by YARN) running on more than two nodes if you
>>> want to use all 11 nodes in your yarn cluster.
>>>
>>
>> If you look at the code, I commented out the manual re-partitioning to 8.
>> Instead, I am created 16 partitions when I call createStream. But I will
>> increase the partitions to, say, 64 and see if I get better parallelism.
>>
>>
>>>
>>> If you are using Spark 1.x, then you dont need to set the ttl for
>>> running Spark Streaming. In case you are using older version, why do you
>>> want to reduce it? You could reduce it, but it does increase the risk of
>>> the premature cleaning, if once in a while things get delayed by 20
>>> seconds. I dont see much harm in keeping the ttl at 60 seconds (a bit of
>>> extra garbage shouldnt hurt performance).
>>>
>>>
>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right,
>> unless I have memory issues, more aggressive pruning won't help.
>>
>> Thanks,
>>
>> Tim
>>
>>
>>
>>
>>>  TD
>>>
>>>
>>> On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith  wrote:
>>>
>>>> Hi,
>>>>
>>>> In my streaming app, I receive from kafka where I have tried setting
>>>> the partitions when calling "createStream" or later, by calling repartition
>>>> - in both cases, the number of nodes running the tasks seems to be
>>>> stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping to
>>>> use more nodes.
>>>>
>>>> I am starting the job as:
>>>> nohup spark-submit --class logStreamNormalizer --master yarn
>>>> log-stream-normalizer_2.10-1.0.jar --jars
>>>> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>>>> --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
>>>> --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo $! >
>>>> run-6.pid
>>>>
>>>> My main code is:
>>>>  val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
>>>>  val ssc = new StreamingContext(sparkConf,Seconds(5))
>>>>  val kInMsg =
>>>> KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct"
>>>> -> 16))
>>>>
>>>>  val propsMap = Map("metadata.broker.list" ->
>>>> "node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092", "serializer.class" ->
>>>> "kafka.serializer.StringEncoder", "producer.type" -> "async",
>>>> "request.required.acks" -> "1")
>>>>  val to_topic = """normStruct"""
>>>>  val writer = new KafkaOutputService(to_topic, propsMap)
>>>>
>>>>
>>>>  if (!configMap.keySet.isEmpty)
>>>>  {
>>>>   //kInMsg.repartition(8)
>>>>   val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
>>>>   outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => {
>>>> writer.output(rec) }) } )
>>>>  }
>>>>
>>>>  ssc.start()
>>>>  ssc.awaitTermination()
>>>>
>>>> In terms of total delay, with a 5 second batch, the delays usually stay
>>>> under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning
>>>> question, does this mean, I can reduce my cleaner ttl from 60 to say 25
>>>> (still more than double of the peak delay)?
>>>>
>>>> Thanks
>>>>
>>>> Tim
>>>>
>>>>
>>>
>>
>


Re: Failed to run runJob at ReceiverTracker.scala

2014-08-28 Thread Tim Smith
I upped the ulimit to 128k files on all nodes. Job crashed again with
"DAGScheduler: Failed to run runJob at ReceiverTracker.scala:275".
Couldn't get the logs because I killed the job and looks like yarn
wipe the container logs (not sure why it wipes the logs under
/var/log/hadoop-yarn/container). Next time, I will grab the logs while
the job is still active/zombie.

So is there a limit on how many times a receiver is re-spawned?

Thanks,

Tim


On Thu, Aug 28, 2014 at 10:06 PM, Tathagata Das
 wrote:
> It did. It got failed and respawned 4 times.
> In this case, the too many open files is a sign that you need increase the
> system-wide limit of open files.
> Try adding ulimit -n 16000 to your conf/spark-env.sh.
>
> TD
>
>
> On Thu, Aug 28, 2014 at 5:29 PM, Tim Smith  wrote:
>>
>> Appeared after running for a while. I re-ran the job and this time, it
>> crashed with:
>> 14/08/29 00:18:50 WARN ReceiverTracker: Error reported by receiver for
>> stream 0: Error in block pushing thread - java.net.SocketException: Too many
>> open files
>>
>> Shouldn't the failed receiver get re-spawned on a different worker?
>>
>>
>>
>> On Thu, Aug 28, 2014 at 4:12 PM, Tathagata Das
>>  wrote:
>>>
>>> Do you see this error right in the beginning or after running for
>>> sometime?
>>>
>>> The root cause seems to be that somehow your Spark executors got killed,
>>> which killed receivers and caused further errors. Please try to take a look
>>> at the executor logs of the lost executor to find what is the root cause
>>> that caused the executor to fail.
>>>
>>> TD
>>>
>>>
>>> On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith  wrote:
>>>>
>>>> Hi,
>>>>
>>>> Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died
>>>> with:
>>>>
>>>> 14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at
>>>> ReceiverTracker.scala:275
>>>> Exception in thread "Thread-59" 14/08/28 22:28:15 INFO
>>>> YarnClientClusterScheduler: Cancelling stage 2
>>>> 14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4)
>>>> 14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove
>>>> executor 5 from BlockManagerMaster.
>>>> 14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in
>>>> removeExecutor
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 2.0:0 failed 4 times, most recent failure: TID 6481 on host
>>>> node-dn1-1.ops.sfdc.net failed for unknown reason
>>>> Driver stacktrace:
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>>> at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>> at
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>>>> at scala.Option.foreach(Option.scala:236)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>>>> at
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>> Any insights into this error?
>>>>
>>>> Thanks,
>>>>
>>>> Tim
>>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DStream repartitioning, performance tuning processing

2014-08-28 Thread Tim Smith
I set partitions to 64:

//
 kInMsg.repartition(64)
 val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
//

Still see all activity only on the two nodes that seem to be receiving
from Kafka.

On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith  wrote:
> TD - Apologies, didn't realize I was replying to you instead of the list.
>
> What does "numPartitions" refer to when calling createStream? I read an
> earlier thread that seemed to suggest that numPartitions translates to
> partitions created on the Spark side?
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
>
> Actually, I re-tried with 64 numPartitions in createStream and that didn't
> work. I will manually set "repartition" to 64/128 and see how that goes.
>
> Thanks.
>
>
>
>
> On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das 
> wrote:
>>
>> Having 16 partitions in KafkaUtils.createStream does not translate to the
>> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
>> best way to distribute the received data between all the nodes, as long as
>> there are sufficient number of partitions (try setting it to 2x the number
>> cores given to the application).
>>
>> Yeah, in 1.0.0, ttl should be unnecessary.
>>
>>
>>
>> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith  wrote:
>>>
>>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
>>>  wrote:
>>>>
>>>> If you are repartitioning to 8 partitions, and your node happen to have
>>>> at least 4 cores each, its possible that all 8 partitions are assigned to
>>>> only 2 nodes. Try increasing the number of partitions. Also make sure you
>>>> have executors (allocated by YARN) running on more than two nodes if you
>>>> want to use all 11 nodes in your yarn cluster.
>>>
>>>
>>> If you look at the code, I commented out the manual re-partitioning to 8.
>>> Instead, I am created 16 partitions when I call createStream. But I will
>>> increase the partitions to, say, 64 and see if I get better parallelism.
>>>
>>>>
>>>>
>>>> If you are using Spark 1.x, then you dont need to set the ttl for
>>>> running Spark Streaming. In case you are using older version, why do you
>>>> want to reduce it? You could reduce it, but it does increase the risk of 
>>>> the
>>>> premature cleaning, if once in a while things get delayed by 20 seconds. I
>>>> dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage
>>>> shouldnt hurt performance).
>>>>
>>>
>>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right,
>>> unless I have memory issues, more aggressive pruning won't help.
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>>
>>>
>>>>
>>>> TD
>>>>
>>>>
>>>> On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith  wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> In my streaming app, I receive from kafka where I have tried setting
>>>>> the partitions when calling "createStream" or later, by calling 
>>>>> repartition
>>>>> - in both cases, the number of nodes running the tasks seems to be
>>>>> stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping 
>>>>> to
>>>>> use more nodes.
>>>>>
>>>>> I am starting the job as:
>>>>> nohup spark-submit --class logStreamNormalizer --master yarn
>>>>> log-stream-normalizer_2.10-1.0.jar --jars
>>>>> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>>>>> --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
>>>>> --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo $! >
>>>>> run-6.pid
>>>>>
>>>>> My main code is:
>>>>>  val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
>>>>>  val ssc = new StreamingContext(sparkConf,Seconds(5))
>>>>>  val kInMsg =
>>>>> KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct"
>>>>> -> 16))
>>>>>
>>>>>  val propsMap = Map("metadata.broker.list" ->

Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
I wrote a long post about how I arrived here but in a nutshell I don't see
evidence of re-partitioning and workload distribution across the cluster.
My new fangled way of starting the job is:

run=`date +"%m-%d-%YT%T"`; \
nohup spark-submit --class logStreamNormalizer \
--master yarn log-stream-normalizer_2.10-1.0.jar \
--jars
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 8G \
--executor-memory 30G \
--executor-cores 16 \
--num-executors 8 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 16 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
>logs/normRunLog-$run.log \
2>logs/normRunLogError-$run.log & \
echo $! > logs/run-$run.pid

Since the job spits out lots of logs, here is how I am trying to determine
if any tasks got assigned to non-local executors.
$ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
| grep -v NODE_LOCAL | grep -v PROCESS_LOCAL

Yields no lines.

If I look at resource pool usage in YARN, this app is assigned 252.5GB of
memory, 128 VCores and 9 containers. Am I missing something here?

Thanks,

Tim







On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith  wrote:

> I set partitions to 64:
>
> //
>  kInMsg.repartition(64)
>  val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
> //
>
> Still see all activity only on the two nodes that seem to be receiving
> from Kafka.
>
> On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith  wrote:
> > TD - Apologies, didn't realize I was replying to you instead of the list.
> >
> > What does "numPartitions" refer to when calling createStream? I read an
> > earlier thread that seemed to suggest that numPartitions translates to
> > partitions created on the Spark side?
> >
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
> >
> > Actually, I re-tried with 64 numPartitions in createStream and that
> didn't
> > work. I will manually set "repartition" to 64/128 and see how that goes.
> >
> > Thanks.
> >
> >
> >
> >
> > On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das <
> tathagata.das1...@gmail.com>
> > wrote:
> >>
> >> Having 16 partitions in KafkaUtils.createStream does not translate to
> the
> >> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
> >> best way to distribute the received data between all the nodes, as long
> as
> >> there are sufficient number of partitions (try setting it to 2x the
> number
> >> cores given to the application).
> >>
> >> Yeah, in 1.0.0, ttl should be unnecessary.
> >>
> >>
> >>
> >> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith  wrote:
> >>>
> >>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
> >>>  wrote:
> >>>>
> >>>> If you are repartitioning to 8 partitions, and your node happen to
> have
> >>>> at least 4 cores each, its possible that all 8 partitions are
> assigned to
> >>>> only 2 nodes. Try increasing the number of partitions. Also make sure
> you
> >>>> have executors (allocated by YARN) running on more than two nodes if
> you
> >>>> want to use all 11 nodes in your yarn cluster.
> >>>
> >>>
> >>> If you look at the code, I commented out the manual re-partitioning to
> 8.
> >>> Instead, I am created 16 partitions when I call createStream. But I
> will
> >>> increase the partitions to, say, 64 and see if I get better
> parallelism.
> >>>
> >>>>
> >>>>
> >>>> If you are using Spark 1.x, then you dont need to set the ttl for
> >>>> running Spark Streaming. In case you are using older version, why do
> you
> >>>> want to reduce it? You could reduce it, but it does increase the risk
> of the
> >>>> premature cleaning, if once in a while things get delayed by 20
> seconds. I
> >>>> dont see much harm in keeping the ttl at 60 seconds (a bit of extra
> garbage
> >>>> shouldnt hurt performance).
> >>>>
> >>>
> >>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are
> right,
> >>> unless I have memory issues, more aggressive pruning won't help.
> >>>
> >>> Thanks,
> >>&

Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
Crash again. On the driver, logs say:
14/08/29 19:04:55 INFO BlockManagerMaster: Removed 7 successfully in
removeExecutor
org.apache.spark.SparkException: Job aborted due to stage failure: Task
2.0:0 failed 4 times, most recent failure: TID 6383 on host
node-dn1-2-acme.com failed for unknown reason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I go look at OS on node-dn1-2 and container logs for TID6383 but find
nothing.
# grep 6383 stderr
14/08/29 18:52:51 INFO CoarseGrainedExecutorBackend: Got assigned task 6383
14/08/29 18:52:51 INFO Executor: Running task ID 6383

However, last message on the container is timestamped "19:04:51" that tells
me the executor was killed for some reason right before the driver noticed
that executor/task failure.

How come my task failed only after 4 times although my config says failure
threshold is 64?








On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith  wrote:

> I wrote a long post about how I arrived here but in a nutshell I don't see
> evidence of re-partitioning and workload distribution across the cluster.
> My new fangled way of starting the job is:
>
> run=`date +"%m-%d-%YT%T"`; \
> nohup spark-submit --class logStreamNormalizer \
> --master yarn log-stream-normalizer_2.10-1.0.jar \
> --jars
> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
> \
> --driver-memory 8G \
> --executor-memory 30G \
> --executor-cores 16 \
> --num-executors 8 \
> --spark.serializer org.apache.spark.serializer.KryoSerializer \
> --spark.rdd.compress true \
> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
> --spark.akka.threads 16 \
> --spark.task.maxFailures 64 \
> --spark.scheduler.mode FAIR \
> >logs/normRunLog-$run.log \
> 2>logs/normRunLogError-$run.log & \
> echo $! > logs/run-$run.pid
>
> Since the job spits out lots of logs, here is how I am trying to determine
> if any tasks got assigned to non-local executors.
> $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
> | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL
>
> Yields no lines.
>
> If I look at resource pool usage in YARN, this app is assigned 252.5GB of
> memory, 128 VCores and 9 containers. Am I missing something here?
>
> Thanks,
>
> Tim
>
>
>
>
>
>
>
> On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith  wrote:
>
>> I set partitions to 64:
>>
>> //
>>  kInMsg.repartition(64)
>>  val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
>> //
>>
>> Still see all activity only on the two nodes that seem to be receiving
>> from Kafka.
>>
>> On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith  wrote:
>> > TD - Apologies, didn't realize I was replying to you instead of the
>> list.
>> >
>> > What does "numPartitions" refer to when calling createStream? I read an
>> > earlier thread that seemed to suggest that numPartitions translates to
>> > partitions created o

Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Good to see I am not the only one who cannot get incoming Dstreams to
repartition. I tried repartition(512) but still no luck - the app
stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
release notes for 1.0.1 and 1.0.2, I don't see anything that says this
was an issue and has been fixed.

How do I debug the repartition() statement to see what's the flow
after the job hits that statement?


On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat  wrote:
> Chris,
>
> I did the Dstream.repartition mentioned in the document on parallelism in
> receiving, as well as set "spark.default.parallelism" and it still uses only
> 2 nodes in my cluster.  I notice there is another email thread on the same
> topic:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
>
> My code is in Java and here is what I have:
>
>JavaPairReceiverInputDStream messages =
>
> KafkaUtils.createStream(ssc, zkQuorum,
> "cse-job-play-consumer", kafkaTopicMap);
>
> JavaPairDStream newMessages =
> messages.repartition(partitionSize);// partitionSize=30
>
> JavaDStream lines = newMessages.map(new
> Function, String>() {
> ...
>
> public String call(Tuple2 tuple2) {
>   return tuple2._2();
> }
>   });
>
> JavaDStream words = lines.flatMap(new
> MetricsComputeFunction()
> );
>
> JavaPairDStream wordCounts = words.mapToPair(
> new PairFunction() {
>...
> }
> );
>
>  wordCounts.foreachRDD(new Function,
> Void>() {...});
>
> Thanks,
> Bharat
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
I create my DStream very simply as:
val kInMsg =
KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
-> 8))
.
.
eventually, before I operate on the DStream, I repartition it:
kInMsg.repartition(512)

Are you saying that ^^ repartition doesn't split by dstream into multiple
smaller streams? Should I manually create multiple Dstreams like this?:
val kInputs = (1 to 10).map {_=> KafkaUtils.createStream()}

Then I apply some custom logic to it as:
val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) //where
normalizeLog takes a String and Map of regex and returns a string

In my case, I think I have traced the issue to the receiver executor being
killed by Yarn:
14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
node-dn1-4-acme.com: remote Akka client disassociated

This be the root cause?
http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
https://issues.apache.org/jira/browse/SPARK-2121





On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen  wrote:

> Are you using multiple Dstreams? repartitioning does not affect how
> many receivers you have. It's on 2 nodes for each receiver. You need
> multiple partitions in the queue, each consumed by a DStream, if you
> mean to parallelize consuming the queue.
>
> On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith  wrote:
> > Good to see I am not the only one who cannot get incoming Dstreams to
> > repartition. I tried repartition(512) but still no luck - the app
> > stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
> > release notes for 1.0.1 and 1.0.2, I don't see anything that says this
> > was an issue and has been fixed.
> >
> > How do I debug the repartition() statement to see what's the flow
> > after the job hits that statement?
> >
> >
> > On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat 
> wrote:
> >> Chris,
> >>
> >> I did the Dstream.repartition mentioned in the document on parallelism
> in
> >> receiving, as well as set "spark.default.parallelism" and it still uses
> only
> >> 2 nodes in my cluster.  I notice there is another email thread on the
> same
> >> topic:
> >>
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
> >>
> >> My code is in Java and here is what I have:
> >>
> >>JavaPairReceiverInputDStream messages =
> >>
> >> KafkaUtils.createStream(ssc, zkQuorum,
> >> "cse-job-play-consumer", kafkaTopicMap);
> >>
> >> JavaPairDStream newMessages =
> >> messages.repartition(partitionSize);// partitionSize=30
> >>
> >> JavaDStream lines = newMessages.map(new
> >> Function, String>() {
> >> ...
> >>
> >> public String call(Tuple2 tuple2) {
> >>   return tuple2._2();
> >> }
> >>   });
> >>
> >> JavaDStream words = lines.flatMap(new
> >> MetricsComputeFunction()
> >> );
> >>
> >> JavaPairDStream wordCounts = words.mapToPair(
> >> new PairFunction() {
> >>...
> >> }
> >> );
> >>
> >>  wordCounts.foreachRDD(new Function Integer>,
> >> Void>() {...});
> >>
> >> Thanks,
> >> Bharat
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Ok, so I did this:
val kInStreams = (1 to 10).map{_ =>
KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
-> 1)) }
val kInMsg = ssc.union(kInStreams)
val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))

This has improved parallelism. Earlier I would only get a "Stream 0". Now I
have "Streams [0-9]". Of course, since the kafka topic has only three
partitions, only three of those streams are active but I am seeing more
blocks being pulled across the three streams total that what one was doing
earlier. Also, four nodes are actively processing tasks (vs only two
earlier) now which actually has me confused. If "Streams" are active only
on 3 nodes then how/why did a 4th node get work? If a 4th got work why
aren't more nodes getting work?






On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith  wrote:

> I create my DStream very simply as:
> val kInMsg =
> KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
> -> 8))
> .
> .
> eventually, before I operate on the DStream, I repartition it:
> kInMsg.repartition(512)
>
> Are you saying that ^^ repartition doesn't split by dstream into multiple
> smaller streams? Should I manually create multiple Dstreams like this?:
> val kInputs = (1 to 10).map {_=> KafkaUtils.createStream()}
>
> Then I apply some custom logic to it as:
> val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) //where
> normalizeLog takes a String and Map of regex and returns a string
>
> In my case, I think I have traced the issue to the receiver executor being
> killed by Yarn:
> 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
> node-dn1-4-acme.com: remote Akka client disassociated
>
> This be the root cause?
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
> https://issues.apache.org/jira/browse/SPARK-2121
>
>
>
>
>
> On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen  wrote:
>
>> Are you using multiple Dstreams? repartitioning does not affect how
>> many receivers you have. It's on 2 nodes for each receiver. You need
>> multiple partitions in the queue, each consumed by a DStream, if you
>> mean to parallelize consuming the queue.
>>
>> On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith  wrote:
>> > Good to see I am not the only one who cannot get incoming Dstreams to
>> > repartition. I tried repartition(512) but still no luck - the app
>> > stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
>> > release notes for 1.0.1 and 1.0.2, I don't see anything that says this
>> > was an issue and has been fixed.
>> >
>> > How do I debug the repartition() statement to see what's the flow
>> > after the job hits that statement?
>> >
>> >
>> > On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat 
>> wrote:
>> >> Chris,
>> >>
>> >> I did the Dstream.repartition mentioned in the document on parallelism
>> in
>> >> receiving, as well as set "spark.default.parallelism" and it still
>> uses only
>> >> 2 nodes in my cluster.  I notice there is another email thread on the
>> same
>> >> topic:
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
>> >>
>> >> My code is in Java and here is what I have:
>> >>
>> >>JavaPairReceiverInputDStream messages =
>> >>
>> >> KafkaUtils.createStream(ssc, zkQuorum,
>> >> "cse-job-play-consumer", kafkaTopicMap);
>> >>
>> >> JavaPairDStream newMessages =
>> >> messages.repartition(partitionSize);// partitionSize=30
>> >>
>> >> JavaDStream lines = newMessages.map(new
>> >> Function, String>() {
>> >> ...
>> >>
>> >> public String call(Tuple2 tuple2) {
>> >>   return tuple2._2();
>> >> }
>> >>   });
>> >>
>> >> JavaDStream words = lines.flatMap(new
>> >> MetricsComputeFunction()
>> >> );
>> >>
>> >> JavaPairDStream wordCounts = words.mapToPair(
>> >> new PairFunction() {
>> >>...
>> >> }
>> >> );
>> >>
>> >>  wordCounts.foreachRDD(new Function> Integer>,
>> >> Void>() {...});
>> >>
>> >> Thanks,
>> >> Bharat
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
>> >> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Tim Smith
I'd be interested to understand this mechanism as well. But this is the
error recovery part of the equation. Consuming from Kafka has two aspects -
parallelism and error recovery and I am not sure how either works. For
error recovery, I would like to understand how:
- A failed receiver gets re-spawned. In 1.0.0, despite settings failed
tasks threshold to 64, my job aborts after 4 receiver task failures.
- Data loss recovery due to a failed receiver task/executor.


> For parallelism, I would expect a single createStream() to intelligently
map a receiver thread somewhere, one for each kafka partition, but in
different JVMs. Also, repartition() does not seem to work as advertised. A
repartition(512) should get nodes other than the receiver nodes to get some
RDDs to process. No?


On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover 
wrote:

> I have this same question.  Isn't there somewhere that the Kafka range
> metadata can be saved?  From my naive perspective, it seems like it should
> be very similar to HDFS lineage.  The original HDFS blocks are kept
> somewhere (in the driver?) so that if an RDD partition is lost, it can be
> recomputed.  In this case, all we need is the Kafka topic, partition, and
> offset range.
>
> Can someone enlighten us on why two copies of the RDD are needed (or some
> other mechanism like a WAL) for fault tolerance when using Kafka but not
> when reading from say HDFS?
>

>
> On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges 
> wrote:
>
>> 'this 2-node replication is mainly for failover in case the receiver
>> dies while data is in flight.  there's still chance for data loss as
>> there's no write ahead log on the hot path, but this is being addressed.'
>>
>> Can you comment a little on how this will be addressed, will there be a
>> durable WAL?  Is there a JIRA for tracking this effort?
>>
>> I am curious without WAL if you can avoid this data loss with explicit
>> management of Kafka offsets e.g. don't commit offset unless data is
>> replicated to multiple nodes or maybe not until processed.  The incoming
>> data will always be durably stored to disk in Kafka so can be replayed in
>> failure scenarios to avoid data loss if the offsets are managed properly.
>>
>>
>>
>>
>> On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly  wrote:
>>
>>> @bharat-
>>>
>>> overall, i've noticed a lot of confusion about how Spark Streaming
>>> scales - as well as how it handles failover and checkpointing, but we can
>>> discuss that separately.
>>>
>>> there's actually 2 dimensions to scaling here:  receiving and processing.
>>>
>>> *Receiving*
>>> receiving can be scaled out by submitting new DStreams/Receivers to the
>>> cluster as i've done in the Kinesis example.  in fact, i purposely chose to
>>> submit multiple receivers in my Kinesis example because i feel it should be
>>> the norm and not the exception - particularly for partitioned and
>>> checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
>>> only way to scale.
>>>
>>> a side note here is that each receiver running in the cluster will
>>> immediately replicates to 1 other node for fault-tolerance of that specific
>>> receiver.  this is where the confusion lies.  this 2-node replication is
>>> mainly for failover in case the receiver dies while data is in flight.
>>>  there's still chance for data loss as there's no write ahead log on the
>>> hot path, but this is being addressed.
>>>
>>> this in mentioned in the docs here:
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>>
>>> *Processing*
>>> once data is received, tasks are scheduled across the Spark cluster just
>>> like any other non-streaming task where you can specify the number of
>>> partitions for reduces, etc.  this is the part of scaling that is sometimes
>>> overlooked - probably because it "works just like regular Spark", but it is
>>> worth highlighting.
>>>
>>> Here's a blurb in the docs:
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing
>>>
>>> the other thing that's confusing with Spark Streaming is that in Scala,
>>> you need to explicitly
>>>
>>> import
>>> org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
>>>
>>> in order to pick up the implicits that allow DStream.reduceByKey and
>>> such (versus DStream.transform(rddBatch => rddBatch.reduceByKey())
>>>
>>> in other words, DStreams appear to be relatively featureless until you
>>> discover this implicit.  otherwise, you need to operate on the underlying
>>> RDD's explicitly which is not ideal.
>>>
>>> the Kinesis example referenced earlier in the thread uses the DStream
>>> implicits.
>>>
>>>
>>> side note to all of this - i've recently convinced my publisher for my
>>> upcoming book, Spark In Action, to let me jump ahead and write the Spark
>>> Streaming chapter ahead of other more well-understood libraries.  early
>>> release is in a month or so.  sign up  @ ht

Re: Publishing a transformed DStream to Kafka

2014-09-02 Thread Tim Smith
I'd be interested in finding the answer too. Right now, I do:

val kafkaOutMsgs = kafkInMessages.map(x=>myFunc(x._2,someParam))
kafkaOutMsgs.foreachRDD((rdd,time) => { rdd.foreach(rec => {
writer.output(rec) }) } ) //where writer.ouput is a method that takes a
string and writer is an instance of a producer class.





On Tue, Sep 2, 2014 at 10:12 AM, Massimiliano Tomassi  wrote:

> Hello all,
> after having applied several transformations to a DStream I'd like to
> publish all the elements in all the resulting RDDs to Kafka. What the best
> way to do that would be? Just using DStream.foreach and then RDD.foreach ?
> Is there any other built in utility for this use case?
>
> Thanks a lot,
> Max
>
> --
> 
> Massimiliano Tomassi
> 
> e-mail: max.toma...@gmail.com
> 
>


Re: Spark Streaming : Could not compute split, block not found

2014-09-02 Thread Tim Smith
I am seeing similar errors in my job's logs.

TD - Are you still waiting for debug logs? If yes, can you please let me
know how to generate debug logs? I am using Spark/Yarn and setting
"NodeManager" logs to "DEBUG" level doesn't seem to produce anything but
INFO logs.

Thanks,

Tim

>Aaah sorry, I should have been more clear. Can you give me INFO (DEBUG
>even better) level logs since the start of the program? I need to see
>how the cleaning up code is managing to delete the block.
>
>TD
>
>On Fri, Aug 1, 2014 at 10:26 PM, Kanwaldeep <[hidden email]
>
wrote:
>
> Here is the log file.
> streaming.gz
> <
http://apache-spark-user-list.1001560.n3.nabble.com/file/n11240/streaming.gz
>
>
> There are quite few AskTimeouts that have happening for about 2 minutes
and
> then followed by block not found errors.
>
> Thanks
> Kanwal


Re: Low Level Kafka Consumer for Spark

2014-09-08 Thread Tim Smith
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't
the right way. You have to /val partedStream = repartition(...)/. Would be
nice to have it fixed in the docs.




On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das 
wrote:

> Some thoughts on this thread to clarify the doubts.
>
> 1. Driver recovery: The current (1.1 to be released) does not recover the
> raw data that has been received but not processes. This is because when the
> driver dies, the executors die and so does the raw data that was stored in
> it. Only for HDFS, the data is not lost by driver recovery as the data is
> already present reliably in HDFS. This is something we want to fix by Spark
> 1.2 (3 month from now). Regarding recovery by replaying the data from
> Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
> exactly-once semantics in all transformations. To guarantee this for all
> kinds of streaming computations stateful and not-stateful computations, it
> is requires that the data be replayed through Kafka in exactly same order,
> and the underlying blocks of data in Spark be regenerated in the exact way
> as it would have if there was no driver failure. This is quite tricky to
> implement, requires manipulation of zookeeper offsets, etc, that is hard to
> do with the high level consumer that KafkaUtil uses. Dibyendu's low level
> Kafka receiver may enable such approaches in the future. For now we
> definitely plan to solve the first problem very very soon.
>
> 3. Repartitioning: I am trying to understand the repartition issue. One
> common mistake I have seen is that developers repartition a stream but not
> use the repartitioned stream.
>
> WRONG:
> inputDstream.repartition(100)
> inputDstream.map(...).count().print()
>
> RIGHT:
> val repartitionedDStream = inputDStream.repartitoin(100)
> repartitionedDStream.map(...).count().print()
>
> Not sure if this helps solve the problem that you all the facing. I am
> going to add this to the stremaing programming guide to make sure this
> common mistake is avoided.
>
> TD
>
>
>
>
> On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> Sorry for little delay . As discussed in this thread, I have modified the
>> Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
>> code to have dedicated Receiver for every Topic Partition. You can see the
>> example howto create Union of these receivers
>> in consumer.kafka.client.Consumer.java .
>>
>> Thanks to Chris for suggesting this change.
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB 
>> wrote:
>>
>>> Just a comment on the recovery part.
>>>
>>> Is it correct to say that currently Spark Streaming recovery design does
>>> not
>>> consider re-computations (upon metadata lineage recovery) that depend on
>>> blocks of data of the received stream?
>>> https://issues.apache.org/jira/browse/SPARK-1647
>>>
>>> Just to illustrate a real use case (mine):
>>> - We have object states which have a Duration field per state which is
>>> incremented on every batch interval. Also this object state is reset to 0
>>> upon incoming state changing events. Let's supposed there is at least one
>>> event since the last data checkpoint. This will lead to inconsistency
>>> upon
>>> driver recovery: The Duration field will get incremented from the data
>>> checkpoint version until the recovery moment, but the state change event
>>> will never be re-processed...so in the end we have the old state with the
>>> wrong Duration value.
>>> To make things worst, let's imagine we're dumping the Duration increases
>>> somewhere...which means we're spreading the problem across our system.
>>> Re-computation awareness is something I've commented on another thread
>>> and
>>> rather treat it separately.
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205
>>>
>>> Re-computations do occur, but the only RDD's that are recovered are the
>>> ones
>>> from the data checkpoint. This is what we've seen. Is not enough by
>>> itself
>>> to ensure recovery of computed data and this partial recovery leads to
>>> inconsistency in some cases.
>>>
>>> Roger - I share the same question with you - I'm just not sure if the
>>> replicated data really gets persisted on every batch. The execution
>>> lineage
>>> is checkpointed, but if we have big chunks of data being consumed to
>>> Receiver node on let's say a second bases then having it persisted to
>>> HDFS
>>> every second could be a big challenge for keeping JVM performance - maybe
>>> that could be reason why it's not really implemented...assuming it isn't.
>>>
>>> Dibyendu had a great effort with the offset controlling code but the
>>> general
>>> state consistent recovery feels to me like another big issue to address.
>>>
>>> I plan on having a dive into the Streaming code and try to at least
>>> contribute wi

Re: How to scale more consumer to Kafka stream

2014-09-10 Thread Tim Smith
How are you creating your kafka streams in Spark?

If you have 10 partitions for a topic, you can call "createStream" ten
times to create 10 parallel receivers/executors and then use "union" to
combine all the dStreams.



On Wed, Sep 10, 2014 at 7:16 AM, richiesgr  wrote:

> Hi (my previous post as been used by someone else)
>
> I'm building a application the read from kafka stream event. In production
> we've 5 consumers that share 10 partitions.
> But on spark streaming kafka only 1 worker act as a consumer then
> distribute
> the tasks to workers so I can have only 1 machine acting as consumer but I
> need more because only 1 consumer means Lags.
>
> Do you've any idea what I can do ? Another point is interresting the master
> is not loaded at all I can get up more than 10 % CPU
>
> I've tried to increase the queued.max.message.chunks on the kafka client to
> read more records thinking it'll speed up the read but I only get
>
> ERROR consumer.ConsumerFetcherThread:
>
> [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:
>
> SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
> PartitionFetchInfo(929838589,1048576),[IA2,6] ->
> PartitionFetchInfo(929515796,1048576),[IA2,9] ->
> PartitionFetchInfo(929577946,1048576),[IA2,8] ->
> PartitionFetchInfo(930751599,1048576),[IA2,2] ->
> PartitionFetchInfo(926457704,1048576),[IA2,5] ->
> PartitionFetchInfo(930774385,1048576),[IA2,0] ->
> PartitionFetchInfo(929913213,1048576),[IA2,3] ->
> PartitionFetchInfo(929268891,1048576),[IA2,4] ->
> PartitionFetchInfo(929949877,1048576),[IA2,1] ->
> PartitionFetchInfo(930063114,1048576)
> java.lang.OutOfMemoryError: Java heap space
>
> Is someone have ideas ?
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
the receivers die within an hour because Yarn kills the containers for high
memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
of running receiver processes and in a heap of 30G, roughly ~16G is taken
by "[B" which is byte arrays.

Still investigating more and would appreciate pointers for troubleshooting.
I have dumped the heap of a receiver and will try to go over it.




On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com> wrote:

> I somehow missed that parameter when I was reviewing the documentation,
> that should do the trick! Thank you!
>
> 2014-09-10 2:10 GMT+01:00 Shao, Saisai :
>
>  Hi Luis,
>>
>>
>>
>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>> used to remove useless timeout streaming data, the difference is that
>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>> input data, but also Spark’s useless metadata; while
>> “spark.streaming.unpersist” is reference-based cleaning mechanism,
>> streaming data will be removed when out of slide duration.
>>
>>
>>
>> Both these two parameter can alleviate the memory occupation of Spark
>> Streaming. But if the data is flooded into Spark Streaming when start up
>> like your situation using Kafka, these two parameters cannot well mitigate
>> the problem. Actually you need to control the input data rate to not inject
>> so fast, you can try “spark.straming.receiver.maxRate” to control the
>> inject rate.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>> *Sent:* Wednesday, September 10, 2014 5:21 AM
>> *To:* user@spark.apache.org
>> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist
>>
>>
>>
>> The executors of my spark streaming application are being killed due to
>> memory issues. The memory consumption is quite high on startup because is
>> the first run and there are quite a few events on the kafka queues that are
>> consumed at a rate of 100K events per sec.
>>
>> I wonder if it's recommended to use spark.cleaner.ttl and
>> spark.streaming.unpersist together to mitigate that problem. And I also
>> wonder if new RDD are being batched while a RDD is being processed.
>>
>> Regards,
>>
>> Luis
>>
>
>


Re: how to choose right DStream batch interval

2014-09-10 Thread Tim Smith
http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617

Slide 39 covers it.

On Tue, Sep 9, 2014 at 9:23 PM, qihong  wrote:

> Hi Mayur,
>
> Thanks for your response. I did write a simple test that set up a DStream
> with
> 5 batches; The batch duration is 1 second, and the 3rd batch will take
> extra
> 2 seconds, the output of the test shows that the 3rd batch causes backlog,
> and spark streaming does catch up on 4th and 5th batch (DStream.print
> was modified to output system time)
>
> ---
> Time: 1409959708000 ms, system time: 1409959708269
> ---
> 1155
> ---
> Time: 1409959709000 ms, system time: 1409959709033
> ---
> 2255
> delay 2000 ms
> ---
> Time: 140995971 ms, system time: 1409959712036
> ---
> 3355
> ---
> Time: 1409959711000 ms, system time: 1409959712059
> ---
> 4455
> ---
> Time: 1409959712000 ms, system time: 1409959712083
> ---
> 
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13855.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark-streaming "Could not compute split" exception

2014-09-10 Thread Tim Smith
I had a similar issue and many others - all were basically symptoms for
yarn killing the container for high memory usage. Haven't gotten to root
cause yet.

On Tue, Sep 9, 2014 at 3:18 PM, Marcelo Vanzin  wrote:

> Your executor is exiting or crashing unexpectedly:
>
> On Tue, Sep 9, 2014 at 3:13 PM, Penny Espinoza
>  wrote:
> > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
> > code from container container_1410224367331_0006_01_03 is : 1
> > 2014-09-09 21:47:26,345 WARN
> > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
> > Exception from container-launch with container ID:
> > container_1410224367331_0006_01_03 and exit code: 1
>
> You can check the app logs (yarn logs --applicationId [id]) and see
> why the container is exiting. There's probably an exception happening
> somewhere.
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
Actually, I am not doing any explicit shuffle/updateByKey or other
transform functions. In my program flow, I take in data from Kafka,
match each message against a list of regex and then if a msg matches a
regex then extract groups, stuff them in json and push out back to
kafka (different topic). So there is really no dependency between two
messages in terms of processing. Here's my container histogram:
http://pastebin.com/s3nAT3cY

Essentially, my app is a cluster grep on steroids.



On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska  wrote:
> Tim, I asked a similar question twice:
> here
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
> and here
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html
>
> and have not yet received any responses. I noticed that the heapdump only
> contains a very large byte array consuming about 66%(the second link
> contains a picture of my heap -- I ran with a small heap to be able to get
> the failure quickly)
>
> I don't have solutions but wanted to affirm that I've observed a similar
> situation...
>
> On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith  wrote:
>>
>> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
>> the receivers die within an hour because Yarn kills the containers for high
>> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
>> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
>> of running receiver processes and in a heap of 30G, roughly ~16G is taken by
>> "[B" which is byte arrays.
>>
>> Still investigating more and would appreciate pointers for
>> troubleshooting. I have dumped the heap of a receiver and will try to go
>> over it.
>>
>>
>>
>>
>> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
>>  wrote:
>>>
>>> I somehow missed that parameter when I was reviewing the documentation,
>>> that should do the trick! Thank you!
>>>
>>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai :
>>>
>>>> Hi Luis,
>>>>
>>>>
>>>>
>>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>>>> used to remove useless timeout streaming data, the difference is that
>>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>>>> input data, but also Spark’s useless metadata; while
>>>> “spark.streaming.unpersist” is reference-based cleaning mechanism, 
>>>> streaming
>>>> data will be removed when out of slide duration.
>>>>
>>>>
>>>>
>>>> Both these two parameter can alleviate the memory occupation of Spark
>>>> Streaming. But if the data is flooded into Spark Streaming when start up
>>>> like your situation using Kafka, these two parameters cannot well mitigate
>>>> the problem. Actually you need to control the input data rate to not inject
>>>> so fast, you can try “spark.straming.receiver.maxRate” to control the 
>>>> inject
>>>> rate.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>>>> Sent: Wednesday, September 10, 2014 5:21 AM
>>>> To: user@spark.apache.org
>>>> Subject: spark.cleaner.ttl and spark.streaming.unpersist
>>>>
>>>>
>>>>
>>>> The executors of my spark streaming application are being killed due to
>>>> memory issues. The memory consumption is quite high on startup because is
>>>> the first run and there are quite a few events on the kafka queues that are
>>>> consumed at a rate of 100K events per sec.
>>>>
>>>> I wonder if it's recommended to use spark.cleaner.ttl and
>>>> spark.streaming.unpersist together to mitigate that problem. And I also
>>>> wonder if new RDD are being batched while a RDD is being processed.
>>>>
>>>> Regards,
>>>>
>>>> Luis
>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I switched from Yarn to StandAlone mode and haven't had OOM issue yet.
However, now I have Akka issues killing the executor:

2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895]
was not delivered. [2] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Before I switched from Yarn to Standalone, I tried looking at heaps of
running executors. What I found odd was that while both - jmap
histo:live and jmap histo showed heap usage in few hundreds of MBytes,
Yarn kept showing that memory utilization is in several Gigabytes -
eventually leading to the container being killed.

I would appreciate if someone can duplicate what I am seeing. Basically:
1. Tail your yarn container logs and see what it is reporting as
memory used by the JVM
2. In parallel, run "jmap -histo:live " or "jmap histo " on
the executor process.

They should be about the same, right?

Also, in the heap dump, 99% of the heap seems to be occupied with
"unreachable objects" (and most of it is byte arrays).




On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith  wrote:
> Actually, I am not doing any explicit shuffle/updateByKey or other
> transform functions. In my program flow, I take in data from Kafka,
> match each message against a list of regex and then if a msg matches a
> regex then extract groups, stuff them in json and push out back to
> kafka (different topic). So there is really no dependency between two
> messages in terms of processing. Here's my container histogram:
> http://pastebin.com/s3nAT3cY
>
> Essentially, my app is a cluster grep on steroids.
>
>
>
> On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska  
> wrote:
>> Tim, I asked a similar question twice:
>> here
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
>> and here
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html
>>
>> and have not yet received any responses. I noticed that the heapdump only
>> contains a very large byte array consuming about 66%(the second link
>> contains a picture of my heap -- I ran with a small heap to be able to get
>> the failure quickly)
>>
>> I don't have solutions but wanted to affirm that I've observed a similar
>> situation...
>>
>> On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith  wrote:
>>>
>>> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
>>> the receivers die within an hour because Yarn kills the containers for high
>>> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
>>> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
>>> of running receiver processes and in a heap of 30G, roughly ~16G is taken by
>>> "[B" which is byte arrays.
>>>
>>> Still investigating more and would appreciate pointers for
>>> troubleshooting. I have dumped the heap of a receiver and will try to go
>>> over it.
>>>
>>>
>>>
>>>
>>> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
>>>  wrote:
>>>>
>>>> I somehow missed that parameter when I was reviewing the documentation,
>>>> that should do the trick! Thank you!
>>>>
>>>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai :
>>>>
>>>>> Hi Luis,
>>>>>
>>>>>
>>>>>
>>>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>>>>> used to remove useless timeout streaming data, the difference is that
>>>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean 
>>>>> streaming
>>>>> input data, but also Spark’s useless metadata; while
>>>>> “spark.streaming.unpersist” is reference-based cleaning mechanism, 
>>>>> streaming
>>>>> data will be removed when out of slide duration.
>>>>>
>>>>>
>>>>>
>>>>> Both these two parameter can alleviate the memory occupation of Spark
>>>>> Streaming. But if the data is flooded into Spark Streaming when start up
>>>>> like your situation using Kafka, these two parameters cannot well mitigate
>>>

Re: Out of memory with Spark Streaming

2014-09-11 Thread Tim Smith
I noticed that, by default, in CDH-5.1 (Spark 1.0.0), in both,
StandAlone and Yarn mode - no GC options are set when an executor is
launched. The only options passed in StandAlone mode are
"-XX:MaxPermSize=128m -Xms16384M -Xmx16384M" (when I give each
executor 16G).

In Yarn mode, even fewer JVM options are set - "-server
-XX:OnOutOfMemoryError=kill %p -Xms16384m -Xmx16384m"

Monitoring OS and heap usage side-by-side (using top and jmap), I see
that my physical memory usage is anywhere between 2x-5x of the heap
usage (all heap, not just live objects).

So I set this, SPARK_JAVA_OPTS="-XX:MaxPermSize=128m -XX:NewSize=1024m
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
-XX:MaxHeapFreeRatio=70"

I am still monitoring but I think my app is more stable now, in
standalone mode, whereas earlier, under Yarn, the container would get
killed for too much memory usage.

How do I get Yarn to enforce SPARK_JAVA_OPTS? Setting
"spark.executor.extrajavaoptions" doesn't seem to work.



On Thu, Sep 11, 2014 at 1:50 PM, Tathagata Das
 wrote:
> Which version of spark are you running?
>
> If you are running the latest one, then could try running not a window but a
> simple event count on every 2 second batch, and see if you are still running
> out of memory?
>
> TD
>
>
> On Thu, Sep 11, 2014 at 10:34 AM, Aniket Bhatnagar
>  wrote:
>>
>> I did change it to be 1 gb. It still ran out of memory but a little later.
>>
>> The streaming job isnt handling a lot of data. In every 2 seconds, it
>> doesn't get more than 50 records. Each record size is not more than 500
>> bytes.
>>
>> On Sep 11, 2014 10:54 PM, "Bharat Venkat"  wrote:
>>>
>>> You could set "spark.executor.memory" to something bigger than the
>>> default (512mb)
>>>
>>>
>>> On Thu, Sep 11, 2014 at 8:31 AM, Aniket Bhatnagar
>>>  wrote:

 I am running a simple Spark Streaming program that pulls in data from
 Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, maps
 data and persists to a store.

 The program is running in local mode right now and runs out of memory
 after a while. I am yet to investigate heap dumps but I think Spark isn't
 releasing memory after processing is complete. I have even tried changing
 storage level to disk only.

 Help!

 Thanks,
 Aniket
>>>
>>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Announcing Spark 1.1.0!

2014-09-11 Thread Tim Smith
Thanks for all the good work. Very excited about seeing more features and
better stability in the framework.


On Thu, Sep 11, 2014 at 5:12 PM, Patrick Wendell  wrote:

> I am happy to announce the availability of Spark 1.1.0! Spark 1.1.0 is
> the second release on the API-compatible 1.X line. It is Spark's
> largest release ever, with contributions from 171 developers!
>
> This release brings operational and performance improvements in Spark
> core including a new implementation of the Spark shuffle designed for
> very large scale workloads. Spark 1.1 adds significant extensions to
> the newest Spark modules, MLlib and Spark SQL. Spark SQL introduces a
> JDBC server, byte code generation for fast expression evaluation, a
> public types API, JSON support, and other features and optimizations.
> MLlib introduces a new statistics library along with several new
> algorithms and optimizations. Spark 1.1 also builds out Spark's Python
> support and adds new components to the Spark Streaming module.
>
> Visit the release notes [1] to read about the new features, or
> download [2] the release today.
>
> [1] http://spark.eu.apache.org/releases/spark-release-1-1-0.html
> [2] http://spark.eu.apache.org/downloads.html
>
> NOTE: SOME ASF DOWNLOAD MIRRORS WILL NOT CONTAIN THE RELEASE FOR SEVERAL
> HOURS.
>
> Please e-mail me directly for any type-o's in the release notes or name
> listing.
>
> Thanks, and congratulations!
> - Patrick
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Stable spark streaming app

2014-09-12 Thread Tim Smith
Hi,

Anyone have a stable streaming app running in "production"? Can you
share some overview of the app and setup like number of nodes, events
per second, broad stream processing workflow, config highlights etc?

Thanks,

Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..

2014-09-12 Thread Tim Smith
Similar issue (Spark 1.0.0). Streaming app runs for a few seconds
before these errors start to pop all over the driver logs:

14/09/12 17:30:23 WARN TaskSetManager: Loss was due to java.lang.Exception
java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:33)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:74)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I am using "MEMORY_AND_DISK_SER" for all my RDDs so I should not be
losing any blocks unless I run out of disk space, right?



On Fri, Sep 12, 2014 at 5:24 AM, Dibyendu Bhattacharya
 wrote:
> I agree,
>
> Even the Low Level Kafka Consumer which I have written has tunable IO
> throttling which help me solve this issue ... But question remains , even if
> there are large backlog, why Spark drop the unprocessed memory blocks ?
>
> Dib
>
> On Fri, Sep 12, 2014 at 5:47 PM, Jeoffrey Lim  wrote:
>>
>> Our issue could be related to this problem as described in:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html
>> which the DStream is processed for every 1 hour batch duration.
>>
>> I have implemented IO throttling in the Receiver as well in our Kafka
>> consumer, and our backlog is not that large.
>>
>> NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for
>> dropping
>> INFO : org.apache.spark.storage.BlockManager - Dropping block
>> input-0-1410443074600 from memory
>> INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600
>> of size 12651900 dropped from memory (free 21220667)
>> INFO : org.apache.spark.storage.BlockManagerInfo - Removed
>> input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory
>> (size: 12.1 MB, free: 100.6 MB)
>>
>> The question that I have now is: how to prevent the
>> MemoryStore/BlockManager of dropping the block inputs? And should they be
>> logged in the level WARN/ERROR?
>>
>>
>> Thanks.
>>
>>
>> On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark
>> User List] <[hidden email]> wrote:
>>>
>>> Dear all,
>>>
>>> I am sorry. This was a false alarm
>>>
>>> There was some issue in the RDD processing logic which leads to large
>>> backlog. Once I fixed the issues in my processing logic, I can see all
>>> messages being pulled nicely without any Block Removed error. I need to tune
>>> certain configurations in my Kafka Consumer to modify the data rate and also
>>> the batch size.
>>>
>>> Sorry again.
>>>
>>>
>>> Regards,
>>> Dibyendu
>>>
>>> On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu <[hidden email]> wrote:

 This is my case about broadcast variable:

 14/07/21 19:49:13 INFO Executor: Running task ID 4
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on
 localhost (progress: 3/106)
 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO Executor: Finished task ID 3
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885
 bytes in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 5
 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)
 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0
 14/07/21 19:49:13 INFO TaskSetMan

Where do logs go in StandAlone mode

2014-09-12 Thread Tim Smith
Spark 1.0.0

I write logs out from my app using this object:

object LogService extends Logging {

/** Set reasonable logging levels for streaming if the user has not
configured log4j. */
 def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
  // We first log something to initialize Spark's default logging,
then we override the
  // logging level.
  logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
  Logger.getRootLogger.setLevel(Level.WARN)
}
  }
}

Later, I set "LogService.setStreamingLogLevels()" and then use "logInfo" etc.

This works well when I run the app under Yarn, all the logs show up
under the container logs but when I run the app in Standalone mode, I
can't find these logs in neither the master, worker or driver logs. So
where do they go?

Thanks,

Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Executor garbage collection

2014-09-12 Thread Tim Smith
Hi,

Anyone setting any explicit GC options for the executor jvm? If yes,
what and how did you arrive at them?

Thanks,

- Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Tim Smith
Hi Dibyendu,

I am a little confused about the need for rate limiting input from
kafka. If the stream coming in from kafka has higher message/second
rate than what a Spark job can process then it should simply build a
backlog in Spark if the RDDs are cached on disk using persist().
Right?

Thanks,

Tim


On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
 wrote:
> Hi Alon,
>
> No this will not be guarantee that same set of messages will come in same
> RDD. This fix just re-play the messages from last processed offset in same
> order. Again this is just a interim fix we needed to solve our use case . If
> you do not need this message re-play feature, just do not perform the ack (
> Acknowledgement) call in the Driver code. Then the processed messages will
> not be written to ZK and hence replay will not happen.
>
> Regards,
> Dibyendu
>
> On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er 
> wrote:
>>
>> Hi Dibyendu,
>>
>> Thanks for your great work!
>>
>> I'm new to Spark Streaming, so I just want to make sure I understand
>> Driver
>> failure issue correctly.
>>
>> In my use case, I want to make sure that messages coming in from Kafka are
>> always broken into the same set of RDDs, meaning that if a set of messages
>> are assigned to one RDD, and the Driver dies before this RDD is processed,
>> then once the Driver recovers, the same set of messages are assigned to a
>> single RDD, instead of arbitrarily repartitioning the messages across
>> different RDDs.
>>
>> Does your Receiver guarantee this behavior, until the problem is fixed in
>> Spark 1.2?
>>
>> Regards,
>> Alon
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-17 Thread Tim Smith
n

These seem related to: https://issues.apache.org/jira/browse/SPARK-2316

Best I understand and have been told, this does not affect data
integrity but may cause un-necessary recomputes.

Hope this helps,

Tim


On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
 wrote:
> Hmm, no response to this thread!
>
> Adding to it, please share experiences of building an enterprise grade 
> product based on Spark Streaming.
>
> I am exploring Spark Streaming for enterprise software and am cautiously 
> optimistic about it. I see huge potential to improve debuggability of Spark.
>
> - Original Message -
> From: "Tim Smith" 
> To: "spark users" 
> Sent: Friday, September 12, 2014 10:09:53 AM
> Subject: Stable spark streaming app
>
> Hi,
>
> Anyone have a stable streaming app running in "production"? Can you
> share some overview of the app and setup like number of nodes, events
> per second, broad stream processing workflow, config highlights etc?
>
> Thanks,
>
> Tim
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-17 Thread Tim Smith
Thanks :)

On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais  wrote:
> Thanks Tim, this is super helpful!
>
> Question about jars and spark-submit:  why do you provide
> myawesomeapp.jar as the program jar but then include other jars via
> the --jars argument?  Have you tried building one uber jar with all
> dependencies and just sending that to Spark as your app jar?

I guess that is mostly because I am Scala/sbt noob :) How do I create
the uber jar? My .sbt file says:
name := "My Awesome App"
version := "1.025"
scalaVersion := "2.10.4"
resolvers += "Apache repo" at
"https://repository.apache.org/content/repositories/releases";
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.0.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1"

Then I run "sbt package" to generate myawesomeapp.jar.

>
> Also, have you ever seen any issues with Spark caching your app jar
> between runs even if it changes?

Not that I can tell but then maybe because I use Yarn, I might be
shielded from some jar distribution bugs in Spark?


>
> On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith  wrote:
>> I don't have anything in production yet but I now at least have a
>> stable (running for more than 24 hours) streaming app. Earlier, the
>> app would crash for all sorts of reasons. Caveats/setup:
>> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
>> - Yarn for RM
>> - Input and Output to Kafka
>> - CDH 5.1
>> - 11 node cluster with 32-cores and 48G max container size for each
>> node (Yarn managed)
>> - 5 partition Kafka topic - both in and out
>> - Roughly, an average of 25k messages per second
>> - App written in Scala (warning: I am a Scala noob)
>>
>> Few things I had to add/tweak to get the app to be stable:
>> - The executor JVMs did not have any GC options set, by default. This
>> might be more of a CDH issue. I noticed that while the Yarn container
>> and other Spark ancillary tasks had GC options set at launch but none
>> for the executors. So I played with different GC options and this
>> worked best:
>> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
>> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
>> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
>> -XX:+PrintGCDetails"
>>
>> I tried G1GC but for some reason it just didn't work. I am not a Java
>> programmer or expert so my conclusion is purely trial and error based.
>> The GC logs, with these flags, go to the "stdout" file in the Yarn
>> container logs on each node/worker. You can set SPARK_JAVA_OPTS in
>> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
>> specifically, even though you don't run Spark as a service (since you
>> are using Yarn for RM), you can goto "Spark Client Advanced
>> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
>> set SPARK_JAVA_OPTS there.
>>
>> - Set these two params - "spark.yarn.executor.memoryOverhead"
>> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
>> because the executors running the kafka receivers would get killed by
>> Yarn for over utilization of memory. Now, these are my memory settings
>> (I will paste the entire app launch params later in the email):
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --spark.yarn.executor.memoryOverhead 4096 \
>> --spark.yarn.driver.memoryOverhead 1024 \
>>
>> Your total executor JVM will consume "executor-memory" minus
>> "spark.yarn.executor.memoryOverhead" so you should see each executor
>> JVM consuming no more than 12G, in this case.
>>
>> Here is how I launch my app:
>> run=`date +"%m-%d-%YT%T"`; \
>> nohup spark-submit --class myAwesomeApp \
>> --master yarn myawesomeapp.jar \
>> --jars 
>> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>> \
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --executor-cores 16 \
>> --num-executors 10 \
>> --spark.serializer org.apache.spark.serializer.KryoSerializer \
>> --spark.rdd.compress true \
>> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
>> --spark.akka.threads 64 \
>> --

Re: LZO support in Spark 1.0.0 - nothing seems to work

2014-09-17 Thread Tim Smith
I believe this is a known bug:
https://issues.apache.org/jira/browse/SPARK-1719

On Wed, Sep 17, 2014 at 5:40 PM, rogthefrog  wrote:
> I have a HDFS cluster managed with CDH Manager. Version is CDH 5.1 with
> matching GPLEXTRAS parcel. LZO works with Hive and Pig, but I can't make it
> work with Spark 1.0.0. I've tried:
>
> * Setting this:
>
> HADOOP_OPTS="-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS
> -Djava.library.path=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native/"
>
> * Setting this in spark-env.sh. I tried with and without "export". I tried
> in CDH Manager and manually on the host.
>
> export
> SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/hadoop-lzo.jar
> export
> SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native/
>
> * Setting this in /etc/spark/conf/spark-defaults.conf:
>
> spark.executor.extraLibraryPath
> /opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native
> spark.spark.executor.extraClassPath
> /opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/hadoop-lzo.jar
>
> * Adding this in CDH manager:
>
> export LD_LIBRARY_PATH=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native
>
> * Hardcoding
> -Djava.library.path=/opt/cloudera/parcels/GPLEXTRAS/lib/hadoop/lib/native in
> the Spark command
>
> * Symlinking the gpl compression binaries into
> /opt/cloudera/parcels/CDH/lib/hadoop/lib/native
>
> * Symlinking the gpl compression binaries into /usr/lib
>
> And nothing worked. When I run pyspark I get this:
>
> 14/09/17 20:38:54 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
> and when I try to run a simple job on a LZO file in HDFS I get this:
>
> distFile.count()
> 14/09/17 13:51:54 ERROR GPLNativeCodeLoader: Could not load native gpl
> library
> java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
>at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886)
>at java.lang.Runtime.loadLibrary0(Runtime.java:849)
>at java.lang.System.loadLibrary(System.java:1088)
>at
> com.hadoop.compression.lzo.GPLNativeCodeLoader.(GPLNativeCodeLoader.java:32)
>at com.hadoop.compression.lzo.LzoCodec.(LzoCodec.java:71)
>
> Can anybody help please? Many thanks.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/LZO-support-in-Spark-1-0-0-nothing-seems-to-work-tp14494.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-18 Thread Tim Smith
Dibyendu - I am using the Kafka consumer built into Spark streaming.
Pulled the jar from here:
http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka_2.10/1.0.0/spark-streaming-kafka_2.10-1.0.0.jar

Thanks for the sbt-assembly link, Soumitra.

On Wed, Sep 17, 2014 at 5:50 PM, Dibyendu Bhattacharya
 wrote:
> Hi Tim
>
> Just curious to know ; Which Kafka Consumer you have used ?
>
> Dib
>
> On Sep 18, 2014 4:40 AM, "Tim Smith"  wrote:
>>
>> Thanks :)
>>
>> On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais  wrote:
>> > Thanks Tim, this is super helpful!
>> >
>> > Question about jars and spark-submit:  why do you provide
>> > myawesomeapp.jar as the program jar but then include other jars via
>> > the --jars argument?  Have you tried building one uber jar with all
>> > dependencies and just sending that to Spark as your app jar?
>>
>> I guess that is mostly because I am Scala/sbt noob :) How do I create
>> the uber jar? My .sbt file says:
>> name := "My Awesome App"
>> version := "1.025"
>> scalaVersion := "2.10.4"
>> resolvers += "Apache repo" at
>> "https://repository.apache.org/content/repositories/releases";
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.0.0"
>> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1"
>>
>> Then I run "sbt package" to generate myawesomeapp.jar.
>>
>> >
>> > Also, have you ever seen any issues with Spark caching your app jar
>> > between runs even if it changes?
>>
>> Not that I can tell but then maybe because I use Yarn, I might be
>> shielded from some jar distribution bugs in Spark?
>>
>>
>> >
>> > On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith  wrote:
>> >> I don't have anything in production yet but I now at least have a
>> >> stable (running for more than 24 hours) streaming app. Earlier, the
>> >> app would crash for all sorts of reasons. Caveats/setup:
>> >> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
>> >> - Yarn for RM
>> >> - Input and Output to Kafka
>> >> - CDH 5.1
>> >> - 11 node cluster with 32-cores and 48G max container size for each
>> >> node (Yarn managed)
>> >> - 5 partition Kafka topic - both in and out
>> >> - Roughly, an average of 25k messages per second
>> >> - App written in Scala (warning: I am a Scala noob)
>> >>
>> >> Few things I had to add/tweak to get the app to be stable:
>> >> - The executor JVMs did not have any GC options set, by default. This
>> >> might be more of a CDH issue. I noticed that while the Yarn container
>> >> and other Spark ancillary tasks had GC options set at launch but none
>> >> for the executors. So I played with different GC options and this
>> >> worked best:
>> >> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m
>> >> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
>> >> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
>> >> -XX:+PrintGCDetails"
>> >>
>> >> I tried G1GC but for some reason it just didn't work. I am not a Java
>> >> programmer or expert so my conclusion is purely trial and error based.
>> >> The GC logs, with these flags, go to the "stdout" file in the Yarn
>> >> container logs on each node/worker. You can set SPARK_JAVA_OPTS in
>> >> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
>> >> specifically, even though you don't run Spark as a service (since you
>> >> are using Yarn for RM), you can goto "Spark Client Advanced
>> >> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and
>> >> set SPARK_JAVA_OPTS there.
>> >>
>> >> - Set these two params - "spark.yarn.executor.memoryOverhead"
>> >> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed
>> >> because the executors running the kafka receivers would get killed by
>> >> Yarn for over utilization of memory. Now, these are my memory settings
>> >&g

Re: Sending multiple DStream outputs

2014-09-18 Thread Tim Smith
Curious, if you have 1:1 mapping between Stream1:topic1 and
Stream2:topic2 then why not run different instances of the app for
each and pass as arguments to each instance the input source and
output topic?

On Thu, Sep 18, 2014 at 8:07 AM, Padmanabhan, Mahesh (contractor)
 wrote:
> Hi all,
>
> I am using Spark 1.0 streaming to ingest a a high volume stream of data
> (approx. 1mm lines every few seconds) transform it into two outputs and send
> those outputs to two separate Apache Kafka topics. I have two blocks of
> output code like this:
>
> Stream1 =
> ….
>
> Stream2 =
>
> …
>
> Stream1.foreachRDD {
>// emit to Kafka topic 1
> }
> Stream2.foreachRDD {
>  // emit to Kafka topic 2
> }
>
> …
>
> My problem is that I never see debug statements in the Stream2.foreachRDD
> code nor is the topic 2 consumer seeing anything. I am wondering if this is
> because these statements are serialized and Spark never gets out of Stream1
> output code before it is ready to process the next Stream1 batch thereby
> never entering the Stream2 output code.
>
> Do I have to parallelize these two output code blocks or am I missing
> something more fundamental?
>
> Thanks,
> Mahesh
>
> 
> This E-mail and any of its attachments may contain Time Warner Cable
> proprietary information, which is privileged, confidential, or subject to
> copyright belonging to Time Warner Cable. This E-mail is intended solely for
> the use of the individual or entity to which it is addressed. If you are not
> the intended recipient of this E-mail, you are hereby notified that any
> dissemination, distribution, copying, or action taken in relation to the
> contents of and attachments to this E-mail is strictly prohibited and may be
> unlawful. If you have received this E-mail in error, please notify the
> sender immediately and permanently delete the original and any copy of this
> E-mail and any printout.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Spark Streaming on Spark 1.1

2014-09-18 Thread Tim Smith
What kafka receiver are you using? Did you build a new jar for your
app with the latest streaming-kafka code for 1.1?


On Thu, Sep 18, 2014 at 11:47 AM, JiajiaJing  wrote:
> Hi Spark Users,
>
> We just upgrade our spark version from 1.0 to 1.1. And we are trying to
> re-run all the written and tested projects we implemented on Spark 1.0.
> However, when we try to execute the spark streaming project that stream data
> from Kafka topics, it yields the following error message. I have no idea
> about why this occurs because the same project runs successfully with Spark
> 1.0.
> May I get some help on this please?
>
> Thank you very much!
>
>
> 2014-09-18 11:06:08,841 ERROR [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logError(75)) - Deregistered
> receiver for stream 0: Error starting receiver 0 -
> java.lang.AbstractMethodError
> at org.apache.spark.Logging$class.log(Logging.scala:52)
> at
> org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:66)
> at org.apache.spark.Logging$class.logInfo(Logging.scala:59)
> at
> org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:66)
> at
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:86)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
>
>
>
>
> Best Regards,
>
> Jiajia
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-on-Spark-1-1-tp14597.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple Kafka Receivers and Union

2014-09-23 Thread Tim Smith
Posting your code would be really helpful in figuring out gotchas.

On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell  wrote:
> Hey,
>
> Spark 1.1.0
> Kafka 0.8.1.1
> Hadoop (YARN/HDFS) 2.5.1
>
> I have a five partition Kafka topic.  I can create a single Kafka receiver
> via KafkaUtils.createStream with five threads in the topic map and consume
> messages fine.  Sifting through the user list and Google, I see that its
> possible to split the Kafka receiver among the Spark workers such that I can
> have a receiver per topic, and have this distributed to workers rather than
> localized to the driver.  I’m following something like this:
> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
> multiple data streams can therefore be achieved by creating multiple input
> DStreams and configuring them to receive different partitions of the data
> stream from the source(s)."
>
> However, I’m not able to consume any messages from Kafka after I perform the
> union operation.  Again, if I create a single, multi-threaded, receiver I
> can consume messages fine.  If I create 5 receivers in a loop, and call
> jssc.union(…) i get:
>
> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>
>
> Do I need to do anything to the unioned DStream?  Am I going about this
> incorrectly?
>
> Thanks in advance.
>
> Matt

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple Kafka Receivers and Union

2014-09-23 Thread Tim Smith
Sorry, I am almost Java illiterate but here's my Scala code to do the
equivalent (that I have tested to work):

val kInStreams = (1 to 10).map{_ =>
KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
-> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
across the cluster, one for each partition, potentially but active
receivers are only as many kafka partitions you have

val kInMsg = 
ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell  wrote:
> So, this is scrubbed some for confidentiality, but the meat of it is as 
> follows.  Note, that if I substitute the commented section for the loop, I 
> receive messages from the topic.
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.set("spark.streaming.unpersist", "true");
> sparkConf.set("spark.logConf", "true");
>
> Map kafkaProps = new HashMap<>();
> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
> kafkaProps.put("group.id", groupId);
>
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
> Seconds.apply(1));
> jsc.checkpoint("hdfs://");
>
> List> streamList = new ArrayList<>(5);
>
> for (int i = 0; i < 5; i++) {
> streamList.add(KafkaUtils.createStream(jsc,
>String.class, ProtobufModel.class,
>StringDecoder.class, 
> ProtobufModelDecoder.class,
>kafkaProps,
>Collections.singletonMap(topic, 1),
>StorageLevel.MEMORY_ONLY_SER()));
> }
>
> final JavaPairDStream stream = 
> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>
> //  final JavaPairReceiverInputDStream stream =
> //  KafkaUtils.createStream(jsc,
> //  String.class, ProtobufModel.class,
> //  StringDecoder.class, 
> ProtobufModelDecoder.class,
> //  kafkaProps,
> //  Collections.singletonMap(topic, 
> 5),
> //  StorageLevel.MEMORY_ONLY_SER());
>
> final JavaPairDStream tuples = stream.mapToPair(
> new PairFunction, String, Integer>() {
> @Override
> public Tuple2 call(Tuple2 
> tuple) throws Exception {
> return new Tuple2<>(tuple._2().getDeviceId(), 1);
> }
> });
>
> … and futher Spark functions ...
>
> On Sep 23, 2014, at 2:55 PM, Tim Smith  wrote:
>
>> Posting your code would be really helpful in figuring out gotchas.
>>
>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell  wrote:
>>> Hey,
>>>
>>> Spark 1.1.0
>>> Kafka 0.8.1.1
>>> Hadoop (YARN/HDFS) 2.5.1
>>>
>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>> messages fine.  Sifting through the user list and Google, I see that its
>>> possible to split the Kafka receiver among the Spark workers such that I can
>>> have a receiver per topic, and have this distributed to workers rather than
>>> localized to the driver.  I’m following something like this:
>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>> multiple data streams can therefore be achieved by creating multiple input
>>> DStreams and configuring them to receive different partitions of the data
>>> stream from the source(s)."
>>>
>>> However, I’m not able to consume any messages from Kafka after I perform the
>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>> jssc.union(…) i get:
>>>
>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>
>>>
>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>> incorrectly?
>>>
>>> Thanks in advance.
>>>
>>> Matt
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple Kafka Receivers and Union

2014-09-23 Thread Tim Smith
Maybe post the before-code as in what was the code before you did the
loop (that worked)? I had similar situations where reviewing code
before (worked) and after (does not work) helped. Also, what helped is
the Scala REPL because I can see what are the object types being
returned by each statement.

Other than code, in the driver logs, you should see events that say
"Registered receiver for stream 0 from
akka.tcp://sp...@node5.acme.net:53135"

Now, if you goto "node5" and look at Spark or YarnContainer logs
(depending on who's doing RM), you should be able to see if the
receiver has any errors when trying to talk to kafka.



On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell  wrote:
> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
> but this may cause waves for me upstream (e.g., non-Java)
>
> Thanks for looking at this.  If anyone else can see a glaring issue in the 
> Java approach that would be appreciated.
>
> Thanks,
> Matt
>
> On Sep 23, 2014, at 4:13 PM, Tim Smith  wrote:
>
>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>> equivalent (that I have tested to work):
>>
>> val kInStreams = (1 to 10).map{_ =>
>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>> across the cluster, one for each partition, potentially but active
>> receivers are only as many kafka partitions you have
>>
>> val kInMsg = 
>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>
>>
>>
>>
>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell  wrote:
>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>> follows.  Note, that if I substitute the commented section for the loop, I 
>>> receive messages from the topic.
>>>
>>> SparkConf sparkConf = new SparkConf();
>>> sparkConf.set("spark.streaming.unpersist", "true");
>>> sparkConf.set("spark.logConf", "true");
>>>
>>> Map kafkaProps = new HashMap<>();
>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>> kafkaProps.put("group.id", groupId);
>>>
>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>> Seconds.apply(1));
>>> jsc.checkpoint("hdfs://");
>>>
>>> List> streamList = new 
>>> ArrayList<>(5);
>>>
>>> for (int i = 0; i < 5; i++) {
>>>streamList.add(KafkaUtils.createStream(jsc,
>>>   String.class, ProtobufModel.class,
>>>   StringDecoder.class, 
>>> ProtobufModelDecoder.class,
>>>   kafkaProps,
>>>   Collections.singletonMap(topic, 
>>> 1),
>>>   StorageLevel.MEMORY_ONLY_SER()));
>>> }
>>>
>>> final JavaPairDStream stream = 
>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>
>>> //  final JavaPairReceiverInputDStream stream =
>>> //  KafkaUtils.createStream(jsc,
>>> //  String.class, 
>>> ProtobufModel.class,
>>> //  StringDecoder.class, 
>>> ProtobufModelDecoder.class,
>>> //  kafkaProps,
>>> //  Collections.singletonMap(topic, 
>>> 5),
>>> //  StorageLevel.MEMORY_ONLY_SER());
>>>
>>> final JavaPairDStream tuples = stream.mapToPair(
>>>new PairFunction, String, Integer>() {
>>>@Override
>>>public Tuple2 call(Tuple2>> ProtobufModel> tuple) throws Exception {
>>>return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>}
>>>});
>>>
>>> … and futher Spark functions ...
>>>
>>> On Sep 23, 2014, at 2:55 PM, Tim Smith  wrote:
>>>
>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>
>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell  
>>>> wrote:
>>>>> Hey,
>>>>>
>>>>> Spark 1.1.0
>>>>> Kafka 0.8.1.1
>>>>

Re: Multiple Kafka Receivers and Union

2014-09-24 Thread Tim Smith
Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?

On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell  wrote:
> The part that works is the commented out, single receiver stream below the 
> loop.  It seems that when I call KafkaUtils.createStream more than once, I 
> don’t receive any messages.
>
> I’ll dig through the logs, but at first glance yesterday I didn’t see 
> anything suspect.  I’ll have to look closer.
>
> mn
>
> On Sep 23, 2014, at 6:14 PM, Tim Smith  wrote:
>
>> Maybe post the before-code as in what was the code before you did the
>> loop (that worked)? I had similar situations where reviewing code
>> before (worked) and after (does not work) helped. Also, what helped is
>> the Scala REPL because I can see what are the object types being
>> returned by each statement.
>>
>> Other than code, in the driver logs, you should see events that say
>> "Registered receiver for stream 0 from
>> akka.tcp://sp...@node5.acme.net:53135"
>>
>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>> (depending on who's doing RM), you should be able to see if the
>> receiver has any errors when trying to talk to kafka.
>>
>>
>>
>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell  wrote:
>>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
>>> but this may cause waves for me upstream (e.g., non-Java)
>>>
>>> Thanks for looking at this.  If anyone else can see a glaring issue in the 
>>> Java approach that would be appreciated.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Sep 23, 2014, at 4:13 PM, Tim Smith  wrote:
>>>
>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>> equivalent (that I have tested to work):
>>>>
>>>> val kInStreams = (1 to 10).map{_ =>
>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>> across the cluster, one for each partition, potentially but active
>>>> receivers are only as many kafka partitions you have
>>>>
>>>> val kInMsg = 
>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell  
>>>> wrote:
>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>>>> follows.  Note, that if I substitute the commented section for the loop, 
>>>>> I receive messages from the topic.
>>>>>
>>>>> SparkConf sparkConf = new SparkConf();
>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>> sparkConf.set("spark.logConf", "true");
>>>>>
>>>>> Map kafkaProps = new HashMap<>();
>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>> kafkaProps.put("group.id", groupId);
>>>>>
>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>>>> Seconds.apply(1));
>>>>> jsc.checkpoint("hdfs://");
>>>>>
>>>>> List> streamList = new 
>>>>> ArrayList<>(5);
>>>>>
>>>>> for (int i = 0; i < 5; i++) {
>>>>>   streamList.add(KafkaUtils.createStream(jsc,
>>>>>  String.class, 
>>>>> ProtobufModel.class,
>>>>>  StringDecoder.class, 
>>>>> ProtobufModelDecoder.class,
>>>>>  kafkaProps,
>>>>>  Collections.singletonMap(topic, 
>>>>> 1),
>>>>>  StorageLevel.MEMORY_ONLY_SER()));
>>>>> }
>>>>>
>>>>> final JavaPairDStream stream = 
>>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>>>
>>>>> //  final JavaPairReceiverInputDStream stream =
>>>>> //  KafkaUtils.createStream(jsc,
>>>>> //  String.class, 
>>>>> ProtobufModel.class,
>>>>> //   

Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Tim Smith
Good to know it worked out and thanks for the update. I didn't realize
you need to provision for receiver workers + processing workers. One
would think a worker would process multiple stages of an app/job and
receive is just a stage of the job.



On Thu, Sep 25, 2014 at 12:05 PM, Matt Narrell  wrote:
> Additionally,
>
> If I dial up/down the number of executor cores, this does what I want.
> Thanks for the extra eyes!
>
> mn
>
> On Sep 25, 2014, at 12:34 PM, Matt Narrell  wrote:
>
> Tim,
>
> I think I understand this now.  I had a five node Spark cluster and a five
> partition topic, and I created five receivers.  I found this:
> http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
> Indicating that if I use all my workers as receivers, there are none left to
> do the processing.  If I drop the number of partitions/receivers down while
> still having multiple unioned receivers, I see messages.
>
> mn
>
> On Sep 25, 2014, at 10:18 AM, Matt Narrell  wrote:
>
> I suppose I have other problems as I can’t get the Scala example to work
> either.  Puzzling, as I have literally coded like the examples (that are
> purported to work), but no luck.
>
> mn
>
> On Sep 24, 2014, at 11:27 AM, Tim Smith  wrote:
>
> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>
> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell 
> wrote:
>
> The part that works is the commented out, single receiver stream below the
> loop.  It seems that when I call KafkaUtils.createStream more than once, I
> don’t receive any messages.
>
> I’ll dig through the logs, but at first glance yesterday I didn’t see
> anything suspect.  I’ll have to look closer.
>
> mn
>
> On Sep 23, 2014, at 6:14 PM, Tim Smith  wrote:
>
> Maybe post the before-code as in what was the code before you did the
> loop (that worked)? I had similar situations where reviewing code
> before (worked) and after (does not work) helped. Also, what helped is
> the Scala REPL because I can see what are the object types being
> returned by each statement.
>
> Other than code, in the driver logs, you should see events that say
> "Registered receiver for stream 0 from
> akka.tcp://sp...@node5.acme.net:53135"
>
> Now, if you goto "node5" and look at Spark or YarnContainer logs
> (depending on who's doing RM), you should be able to see if the
> receiver has any errors when trying to talk to kafka.
>
>
>
> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell 
> wrote:
>
> To my eyes, these are functionally equivalent.  I’ll try a Scala approach,
> but this may cause waves for me upstream (e.g., non-Java)
>
> Thanks for looking at this.  If anyone else can see a glaring issue in the
> Java approach that would be appreciated.
>
> Thanks,
> Matt
>
> On Sep 23, 2014, at 4:13 PM, Tim Smith  wrote:
>
> Sorry, I am almost Java illiterate but here's my Scala code to do the
> equivalent (that I have tested to work):
>
> val kInStreams = (1 to 10).map{_ =>
> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
> across the cluster, one for each partition, potentially but active
> receivers are only as many kafka partitions you have
>
> val kInMsg =
> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>
>
>
> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell 
> wrote:
>
> So, this is scrubbed some for confidentiality, but the meat of it is as
> follows.  Note, that if I substitute the commented section for the loop, I
> receive messages from the topic.
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.set("spark.streaming.unpersist", "true");
> sparkConf.set("spark.logConf", "true");
>
> Map kafkaProps = new HashMap<>();
> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
> kafkaProps.put("group.id", groupId);
>
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
> Seconds.apply(1));
> jsc.checkpoint("hdfs://");
>
> List> streamList = new
> ArrayList<>(5);
>
> for (int i = 0; i < 5; i++) {
> streamList.add(KafkaUtils.createStream(jsc,
>String.class, ProtobufModel.class,
>StringDecoder.class,
> ProtobufModelDecoder.class,
>kafkaProps,
>Collections.singletonMap(topic, 1),
>