Hi, I am using spark 1.6. I wrote a custom receiver to read from WebSocket. But when I start my spark job, it connects to the WebSocket but doesn't get any message. Same code, if I write as separate scala class, it works and prints messages from WebSocket. Is anything missing in my Spark Code? There are no errors in spark console.
Here is my receiver - import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket} /** * Custom receiver for WebSocket */ class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging { private var webSocket: WebSocket = _ @transient private var thread: Thread = _ override def onStart(): Unit = { thread = new Thread(this) thread.start() } override def onStop(): Unit = { setWebSocket(null) thread.interrupt() } override def run(): Unit = { println("Received ----") receive() } private def receive(): Unit = { val connection = WebSocket().open("ws://localhost:3001") println("WebSocket Connected ..." ) println("Connected ------- " + connection) setWebSocket(connection) connection.listener(new TextListener { override def onMessage(message: String) { System.out.println("Message in Spark client is --> " + message) } }) } private def setWebSocket(newWebSocket: WebSocket) = synchronized { if (webSocket != null) { webSocket.shutDown } webSocket = newWebSocket } } ===== Here is code for Spark job object WebSocketTestApp { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("Test Web Socket") .setMaster("local[20]") .set("test", "") val ssc = new StreamingContext(conf, Seconds(5)) val stream: ReceiverInputDStream[String] = ssc.receiverStream(new WebSocketReceiver()) stream.print() ssc.start() ssc.awaitTermination() } ============== } Thanks, LCassa