Please find below code
def main(args: Array[String]): Unit = {
val config: Config = ConfigFactory.load()
val streamC = StreamingContext.getOrCreate(
checkpointDirectory,
() => functionToCreateContext(config, checkpointDirectory)
)
streamC.start()
streamC.awaitTermination()
}
def functionToCreateContext(config: Config, checkpointDirectory: String):
StreamingContext = {
val brokerUrl = config.getString("streaming.solace.brokerURL")
val username = config.getString("streaming.solace.userName")
val passwordSol = config.getString("streaming.solace.password")
val vpn = config.getString("streaming.solace.vpn")
val queue = config.getString("streaming.solace.queueName")
val connectionFactory =
config.getString("streaming.solace.connectionFactory")
val spark = SparkSession
.builder()
.appName("rem-Streaming-Consumer")
.config("spark.streaming.receiver.writeAheadLog.enable", "true")
.config("spark.streaming.blockInterval", blockInterval)
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.config("spark.streaming.receiver.writeAheadLog.enable", "true")
.enableHiveSupport
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(batchInterval))
ssc.checkpoint(checkpointDirectory)
val converter: Message => Option[String] = {
case msg: TextMessage =>
Some(msg.getText)
case _ =>
None
}
val props = new Properties()
props.setProperty(
Context.INITIAL_CONTEXT_FACTORY,
"com.solacesystems.jndi.SolJNDIInitialContextFactory"
)
props.setProperty(Context.PROVIDER_URL, brokerUrl)
props.setProperty(Context.SECURITY_PRINCIPAL, username)
props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol)
props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn)
val msgs = JmsStreamUtils.createSynchronousJmsQueueStream(
ssc,
JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue),
connectionFactoryName
= connectionFactory,messageSelector =
""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 )
msgs.foreachRDD(rdd =>
if (rdd.take(1).length > 0) {
val messages: RDD[String] = rdd.map { sr =>
if (sr == null) {
println("NO records found")
"NO records found"
} else {
println("Input Records from Solace queue : " + sr.toString)
sr.toString
}
}
Thread.sleep(120000)
try{
* val messagesJson = spark.read.json(messages) ===> getting NPE here
after restarting using WAL*
messagesJson.write.mode("append").parquet(data)
}
catch {
case ex => ex.printStackTrace()
}
})
ssc
}
Thanks & Regards,
Nayan Sharma
*+91-8095382952*
<https://www.linkedin.com/in/nayan-sharma>
<http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>
On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:
> Hi,
>
> It is challenging to make a recommendation without further details. I am
> guessing you are trying to build a fault-tolerant spark application (spark
> structured streaming) that consumes messages from Solace?
> To address *NullPointerException* in the context of the provided
> information, you need to review the part of the code where the exception is
> thrown and identifying which object or method call is resulting in *null* can
> help the debugging process plus checking the logs.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
> view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
> https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Feb 2024 at 05:29, nayan sharma <nayansharm...@gmail.com>
> wrote:
>
>> Hi Users,
>>
>> I am trying to build fault tolerant spark solace consumer.
>>
>> Issue :- we have to take restart of the job due to multiple issue load
>> average is one of them. At that time whatever spark is processing or
>> batches in the queue is lost. We can't replay it because we already had
>> send ack while calling store().
>>
>> Solution:- I have tried implementing WAL and checkpointing in the
>> solution. Job is able to identify the lost batches, records are not being
>> written in the log file but throwing NPE.
>>
>> We are creating sparkcontext using sc.getorcreate()
>>
>>
>> Thanks,
>> Nayan
>>
>