Please find below code

 def main(args: Array[String]): Unit = {
    val config: Config = ConfigFactory.load()
    val streamC = StreamingContext.getOrCreate(
      checkpointDirectory,
      () => functionToCreateContext(config, checkpointDirectory)
    )

    streamC.start()
    streamC.awaitTermination()
  }

  def functionToCreateContext(config: Config, checkpointDirectory: String):
StreamingContext = {

    val brokerUrl = config.getString("streaming.solace.brokerURL")
    val username = config.getString("streaming.solace.userName")
    val passwordSol = config.getString("streaming.solace.password")
    val vpn = config.getString("streaming.solace.vpn")
    val queue = config.getString("streaming.solace.queueName")
    val connectionFactory =
config.getString("streaming.solace.connectionFactory")



    val spark = SparkSession
      .builder()
      .appName("rem-Streaming-Consumer")
      .config("spark.streaming.receiver.writeAheadLog.enable", "true")
      .config("spark.streaming.blockInterval", blockInterval)
      .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
      .config("spark.streaming.receiver.writeAheadLog.enable", "true")
       .enableHiveSupport
      .getOrCreate()
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(batchInterval))
    ssc.checkpoint(checkpointDirectory)

    val converter: Message => Option[String] = {
      case msg: TextMessage =>
        Some(msg.getText)
      case _ =>
        None
    }

    val props = new Properties()
    props.setProperty(
      Context.INITIAL_CONTEXT_FACTORY,
      "com.solacesystems.jndi.SolJNDIInitialContextFactory"
    )
    props.setProperty(Context.PROVIDER_URL, brokerUrl)
    props.setProperty(Context.SECURITY_PRINCIPAL, username)
    props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol)
    props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn)

    val msgs = JmsStreamUtils.createSynchronousJmsQueueStream(
      ssc,

JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue),
connectionFactoryName
= connectionFactory,messageSelector =
""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 )

    msgs.foreachRDD(rdd =>
      if (rdd.take(1).length > 0) {
        val messages: RDD[String] = rdd.map { sr =>
          if (sr == null) {
            println("NO records found")
            "NO records found"
          } else {
            println("Input Records from Solace queue : " + sr.toString)
            sr.toString
          }
        }
        Thread.sleep(120000)
    try{
  *     val messagesJson = spark.read.json(messages) ===> getting NPE here
after restarting using WAL*
          messagesJson.write.mode("append").parquet(data)
        }
        catch {
          case ex => ex.printStackTrace()
        }
      })
    ssc
  }
Thanks & Regards,
Nayan Sharma
 *+91-8095382952*

<https://www.linkedin.com/in/nayan-sharma>
<http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>


On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> It is challenging to make a recommendation without further details. I am
> guessing you are trying to build a fault-tolerant spark application (spark
> structured streaming) that consumes messages from Solace?
> To address *NullPointerException* in the context of the provided
> information, you need to review the part of the code where the exception is
> thrown and identifying which object or method call is resulting in *null* can
> help the debugging process plus checking the logs.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Feb 2024 at 05:29, nayan sharma <nayansharm...@gmail.com>
> wrote:
>
>> Hi Users,
>>
>> I am trying to build fault tolerant spark solace consumer.
>>
>> Issue :- we have to take restart of the job due to multiple issue load
>> average is one of them. At that time whatever spark is processing or
>> batches in the queue is lost. We can't replay it because we already had
>> send ack while calling store().
>>
>> Solution:- I have tried implementing WAL and checkpointing in the
>> solution. Job is able to identify the lost batches, records are not being
>> written in the log file but throwing NPE.
>>
>> We are creating sparkcontext using sc.getorcreate()
>>
>>
>> Thanks,
>> Nayan
>>
>

Reply via email to