Please find below code def main(args: Array[String]): Unit = { val config: Config = ConfigFactory.load() val streamC = StreamingContext.getOrCreate( checkpointDirectory, () => functionToCreateContext(config, checkpointDirectory) )
streamC.start() streamC.awaitTermination() } def functionToCreateContext(config: Config, checkpointDirectory: String): StreamingContext = { val brokerUrl = config.getString("streaming.solace.brokerURL") val username = config.getString("streaming.solace.userName") val passwordSol = config.getString("streaming.solace.password") val vpn = config.getString("streaming.solace.vpn") val queue = config.getString("streaming.solace.queueName") val connectionFactory = config.getString("streaming.solace.connectionFactory") val spark = SparkSession .builder() .appName("rem-Streaming-Consumer") .config("spark.streaming.receiver.writeAheadLog.enable", "true") .config("spark.streaming.blockInterval", blockInterval) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.streaming.receiver.writeAheadLog.enable", "true") .enableHiveSupport .getOrCreate() val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(batchInterval)) ssc.checkpoint(checkpointDirectory) val converter: Message => Option[String] = { case msg: TextMessage => Some(msg.getText) case _ => None } val props = new Properties() props.setProperty( Context.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory" ) props.setProperty(Context.PROVIDER_URL, brokerUrl) props.setProperty(Context.SECURITY_PRINCIPAL, username) props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol) props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn) val msgs = JmsStreamUtils.createSynchronousJmsQueueStream( ssc, JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue), connectionFactoryName = connectionFactory,messageSelector = ""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 ) msgs.foreachRDD(rdd => if (rdd.take(1).length > 0) { val messages: RDD[String] = rdd.map { sr => if (sr == null) { println("NO records found") "NO records found" } else { println("Input Records from Solace queue : " + sr.toString) sr.toString } } Thread.sleep(120000) try{ * val messagesJson = spark.read.json(messages) ===> getting NPE here after restarting using WAL* messagesJson.write.mode("append").parquet(data) } catch { case ex => ex.printStackTrace() } }) ssc } Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > It is challenging to make a recommendation without further details. I am > guessing you are trying to build a fault-tolerant spark application (spark > structured streaming) that consumes messages from Solace? > To address *NullPointerException* in the context of the provided > information, you need to review the part of the code where the exception is > thrown and identifying which object or method call is resulting in *null* can > help the debugging process plus checking the logs. > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 10 Feb 2024 at 05:29, nayan sharma <nayansharm...@gmail.com> > wrote: > >> Hi Users, >> >> I am trying to build fault tolerant spark solace consumer. >> >> Issue :- we have to take restart of the job due to multiple issue load >> average is one of them. At that time whatever spark is processing or >> batches in the queue is lost. We can't replay it because we already had >> send ack while calling store(). >> >> Solution:- I have tried implementing WAL and checkpointing in the >> solution. Job is able to identify the lost batches, records are not being >> written in the log file but throwing NPE. >> >> We are creating sparkcontext using sc.getorcreate() >> >> >> Thanks, >> Nayan >> >