Thank you for your time and effort. Here is the code: ------------------- public final class Multinode extends Receiver<Output> {
String host = null; int portRx = -1; int portTx = -1; private final Semaphore sem = new Semaphore(1); public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("Multinode"); sparkConf.set("spark.cores.max", args[1]); sparkConf.set("spark.task.cpus", args[2]); sparkConf.set("spark.default.parallelism",args[2]); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Integer.parseInt(args[6]))); JavaReceiverInputDStream<Output> data = ssc.receiverStream( new Multinode(args[3], Integer.parseInt(args[4]),Integer.parseInt(args[5]))); JavaDStream<Integer> udp = data.map(new Function<Output, Integer>() { @Override public Integer call(Output x) { Integer Ret=new Integer(x.position()); return (Ret); } }); udp.print(); ssc.start(); ssc.awaitTermination(); } public Multinode(String host_ , int portRx_,int portTx_) { super(StorageLevel.MEMORY_AND_DISK_2()); host = host_; portRx = portRx_; portTx = portTx_; } public void onStart() { // Start the thread that receives data over a connection new Thread() { @Override public void run() { receive(); } }.start(); } public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private void receive() { Process p; DatagramSocket socket = null; byte[] buf = new byte[1400]; DatagramPacket packet = new DatagramPacket(buf, buf.length); Kryo kryo = new Kryo(); kryo.register(DatagramPacket.class); Output output=new Output(2900); //2*(1400+35) try { // connect to the server if(socket==null) socket = new DatagramSocket(portRx); InetAddress returnIPAddress = InetAddress.getByName(host) ; int returnPort =portTx; do{ try { sem.acquire(); p = Runtime.getRuntime().exec("Prog"); sem.release(); }catch(IOException ioe) { //ioe.printStackTrace(); break; } socket.receive(packet); output.clear(); kryo.writeObject(output, packet); store(output); packet.setAddress(returnIPAddress); packet.setPort(returnPort); socket.send(packet); } while (!isStopped()); socket.close(); socket = null; // Restart in an attempt to connect again when server is active again restart("Trying to connect again"); } catch(ConnectException ce) { // restart if could not connect to server restart("Could not connect", ce); } catch(Throwable t) { restart("Error receiving data", t); } } } [http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] Nastooh Avessta ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com Phone: +1 604 647 1527 Cisco Systems Limited 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com<http://www.cisco.com/> [Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000; Fax: 416-306-7099. Preferences<http://www.cisco.com/offer/subscribe/?sid=000478326> - Unsubscribe<http://www.cisco.com/offer/unsubscribe/?sid=000478327> – Privacy<http://www.cisco.com/web/siteassets/legal/privacy.html> From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, February 27, 2015 12:39 PM To: Nastooh Avessta (navesta) Cc: user@spark.apache.org Subject: Re: Race Condition in Streaming Thread Its wasn't clear from the snippet whats going on. Can your provide the whole Receiver code? TD On Fri, Feb 27, 2015 at 12:37 PM, Nastooh Avessta (navesta) <nave...@cisco.com<mailto:nave...@cisco.com>> wrote: I am, as I issue killall -9 Prog, prior to testing. Cheers, [http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] Nastooh Avessta ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com<mailto:nave...@cisco.com> Phone: +1 604 647 1527<tel:%2B1%20604%20647%201527> Cisco Systems Limited 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com<http://www.cisco.com/> [Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000<tel:416-306-7000>; Fax: 416-306-7099<tel:416-306-7099>. Preferences<http://www.cisco.com/offer/subscribe/?sid=000478326> - Unsubscribe<http://www.cisco.com/offer/unsubscribe/?sid=000478327> – Privacy<http://www.cisco.com/web/siteassets/legal/privacy.html> From: Tathagata Das [mailto:t...@databricks.com<mailto:t...@databricks.com>] Sent: Friday, February 27, 2015 12:29 PM To: Nastooh Avessta (navesta) Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Race Condition in Streaming Thread Are you sure the multiple invocations are not from previous runs of the program? TD On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) <nave...@cisco.com<mailto:nave...@cisco.com>> wrote: Hi Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd party udp traffic generator, from the streaming thread. The excerpt is as follows: … do{ try { p = Runtime.getRuntime().exec(Prog "); socket.receive(packet); output.clear(); kryo.writeObject(output, packet); store(output); … Program has a test to check for existing instantiation, e.g. [ "$(pidof Prog)" ] && exit. This code runs fine, i.e., 3rd party application is invoked, data is received, analyzed on driver, etc. Problem arises, when I test redundancy and fault-tolerance. Specifically, when I manually terminate Prog, upon recovery, multiple invocations are observed. This could be due to multiple threads getting through [ "$(pidof Prog)" ] && exit. However, I was hoping by adding semaphores, as follows, to avoid this problem: … do{ try { sem.acquire(); p = Runtime.getRuntime().exec("Prog"); sem.release(); }catch(IOException ioe) { //ioe.printStackTrace(); break; } socket.receive(packet); //InetAddress returnIPAddress = packet.getAddress(); returnPort = packet.getPort(); output.clear(); kryo.writeObject(output, packet); store(output); … However, I am still seeing multiple invocations of Prog, upon recovery. I have also experimented with the following configuration parameters, to no avail: sparkConf.set("spark.cores.max", args[1]); sparkConf.set("spark.task.cpus", args[2]); sparkConf.set("spark.default.parallelism",args[2]); with args={(1,1),(2,1), (1,2),…} Any thoughts? Cheers, [http://www.cisco.com/web/europe/images/email/signature/logo05.jpg] Nastooh Avessta ENGINEER.SOFTWARE ENGINEERING nave...@cisco.com<mailto:nave...@cisco.com> Phone: +1 604 647 1527<tel:%2B1%20604%20647%201527> Cisco Systems Limited 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121 VANCOUVER BRITISH COLUMBIA V7X 1J1 CA Cisco.com<http://www.cisco.com/> [Think before you print.]Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. For corporate legal information go to: http://www.cisco.com/web/about/doing_business/legal/cri/index.html Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. Phone: 416-306-7000<tel:416-306-7000>; Fax: 416-306-7099<tel:416-306-7099>. Preferences<http://www.cisco.com/offer/subscribe/?sid=000478326> - Unsubscribe<http://www.cisco.com/offer/unsubscribe/?sid=000478327> – Privacy<http://www.cisco.com/web/siteassets/legal/privacy.html>