Hi Mich,
Perhaps the issue is having multiple SparkContexts in the same JVM (
https://issues.apache.org/jira/browse/SPARK-2243).
While it is possible, I don't think it is encouraged.
As you know, the call your currently invoking to create the
StreamingContext also creates a
SparkContext.
/** * Create a StreamingContext by providing the configuration necessary
for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark
parameters
* @param batchDuration the time interval at which streaming data will be
divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
Could you rearrange the code slightly to either create the SparkContext
first and pass that to the creation of the StreamContext
like below:
val sc = new SparkContext(sparkConf)
val streamingContext = new StreamingContext(sc, Seconds(batchInterval))
*val HiveContext = new HiveContext(sc)*
Or remove / replace the line in red from your code and just set the val
sparkContext = streamingContext.sparkContext.
val streamingContext = new StreamingContext(sparkConf,
Seconds(batchInterval))
*val sparkContext = new SparkContext(sparkConf)*
val HiveContext = new HiveContext(streamingContext.sparkContext)
HTH.
-Todd
On Thu, Sep 8, 2016 at 9:11 AM, Mich Talebzadeh <[email protected]>
wrote:
> Ok I managed to sort that one out.
>
> This is what I am facing
>
> val sparkConf = new SparkConf().
> setAppName(sparkAppName).
> set("spark.driver.allowMultipleContexts", "true").
> set("spark.hadoop.validateOutputSpecs", "false")
> // change the values accordingly.
> sparkConf.set("sparkDefaultParllelism",
> sparkDefaultParallelismValue)
> sparkConf.set("sparkSerializer", sparkSerializerValue)
> sparkConf.set("sparkNetworkTimeOut",
> sparkNetworkTimeOutValue)
> // If you want to see more details of batches please increase
> the value
> // and that will be shown UI.
> sparkConf.set("sparkStreamingUiRetainedBatches",
> sparkStreamingUiRetainedBatchesValue)
> sparkConf.set("sparkWorkerUiRetainedDrivers",
> sparkWorkerUiRetainedDriversValue)
> sparkConf.set("sparkWorkerUiRetainedExecutors",
> sparkWorkerUiRetainedExecutorsValue)
> sparkConf.set("sparkWorkerUiRetainedStages",
> sparkWorkerUiRetainedStagesValue)
> sparkConf.set("sparkUiRetainedJobs",
> sparkUiRetainedJobsValue)
> sparkConf.set("enableHiveSupport",enableHiveSupportValue)
> sparkConf.set("spark.streaming.stopGracefullyOnShutdown","
> true")
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable",
> "true")
>
> sparkConf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite",
> "true")
>
> sparkConf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite",
> "true")
> var sqltext = ""
> val batchInterval = 2
> val streamingContext = new StreamingContext(sparkConf,
> Seconds(batchInterval))
>
> With the above settings, Spark streaming works fine. *However, after
> adding the first line below (in red)*
>
> *val sparkContext = new SparkContext(sparkConf)*
> val HiveContext = new HiveContext(streamingContext.sparkContext)
>
> I get the following errors:
>
> 16/09/08 14:02:32 ERROR JobScheduler: Error running job streaming job
> 1473339752000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
> 0.0 (TID 7, 50.140.197.217): java.io.IOException:
> *org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of
> broadcast_0* at org.apache.spark.util.Utils$.
> tryOrIOException(Utils.scala:1260)
> at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(
> TorrentBroadcast.scala:174)
> at org.apache.spark.broadcast.TorrentBroadcast._value$
> lzycompute(TorrentBroadcast.scala:65)
> at org.apache.spark.broadcast.TorrentBroadcast._value(
> TorrentBroadcast.scala:65)
> at org.apache.spark.broadcast.TorrentBroadcast.getValue(
> TorrentBroadcast.scala:89)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:67)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_0_piece0 of broadcast_0
>
>
> Hm any ideas?
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 8 September 2016 at 12:28, Mich Talebzadeh <[email protected]>
> wrote:
>
>>
>> Hi,
>>
>> This may not be feasible in Spark streaming.
>>
>> I am trying to create a HiveContext in Spark streaming within the
>> streaming context
>>
>> // Create a local StreamingContext with two working thread and batch
>> interval of 2 seconds.
>>
>> val sparkConf = new SparkConf().
>> setAppName(sparkAppName).
>> set("spark.driver.allowMultipleContexts", "true").
>> set("spark.hadoop.validateOutputSpecs", "false")
>> .....
>>
>> Now try to create an sc
>>
>> val sc = new SparkContext(sparkConf)
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> This is accepted but it creates two spark jobs
>>
>>
>> [image: Inline images 1]
>>
>> And basically it goes to a waiting state
>>
>> Any ideas how one can create a HiveContext within Spark streaming?
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn *
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>