Thank you for your time and effort. Here is the code:
-------------------

public  final class Multinode extends Receiver<Output> {

                  String host = null;
                  int portRx = -1;
                  int portTx = -1;
                  private final Semaphore sem = new Semaphore(1);

public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf().setAppName("Multinode");
    sparkConf.set("spark.cores.max", args[1]);
    sparkConf.set("spark.task.cpus", args[2]);
    sparkConf.set("spark.default.parallelism",args[2]);
    sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");

    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(Integer.parseInt(args[6])));

    JavaReceiverInputDStream<Output> data = ssc.receiverStream(
    new Multinode(args[3], 
Integer.parseInt(args[4]),Integer.parseInt(args[5])));

    JavaDStream<Integer> udp = data.map(new Function<Output, Integer>() {
      @Override
      public Integer call(Output x) {

                Integer Ret=new Integer(x.position());
        return (Ret);
      }
    });

    udp.print();
    ssc.start();
    ssc.awaitTermination();
  }


  public Multinode(String host_ , int portRx_,int portTx_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    portRx = portRx_;
    portTx = portTx_;

  }

  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
      @Override public void run() {

                                                receive();

      }
    }.start();
  }

  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
                  Process p;
    DatagramSocket socket = null;
    byte[] buf = new byte[1400];
    DatagramPacket packet = new DatagramPacket(buf, buf.length);
    Kryo kryo = new Kryo();
    kryo.register(DatagramPacket.class);
    Output output=new Output(2900); //2*(1400+35)


    try {

      // connect to the server
      if(socket==null) socket = new  DatagramSocket(portRx);
      InetAddress returnIPAddress = InetAddress.getByName(host) ;
      int returnPort =portTx;

     do{

                 try {
                                                sem.acquire();

                                                  p = 
Runtime.getRuntime().exec("Prog");
                                                 sem.release();
                                }catch(IOException ioe)                {
                                                      //ioe.printStackTrace();
                                                      break;
                                                }
                  socket.receive(packet);

          output.clear();
          kryo.writeObject(output, packet);
          store(output);
          packet.setAddress(returnIPAddress);
          packet.setPort(returnPort);
          socket.send(packet);
      } while (!isStopped());
      socket.close(); socket = null;

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      restart("Error receiving data", t);
    }
  }

}

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com<http://www.cisco.com/>





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferences<http://www.cisco.com/offer/subscribe/?sid=000478326> - 
Unsubscribe<http://www.cisco.com/offer/unsubscribe/?sid=000478327> – 
Privacy<http://www.cisco.com/web/siteassets/legal/privacy.html>

From: Tathagata Das [mailto:t...@databricks.com]
Sent: Friday, February 27, 2015 12:39 PM
To: Nastooh Avessta (navesta)
Cc: user@spark.apache.org
Subject: Re: Race Condition in Streaming Thread

Its wasn't clear from the snippet whats going on. Can your provide the whole 
Receiver code?

TD

On Fri, Feb 27, 2015 at 12:37 PM, Nastooh Avessta (navesta) 
<nave...@cisco.com<mailto:nave...@cisco.com>> wrote:
I am, as I issue killall -9 Prog, prior to testing.
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com<mailto:nave...@cisco.com>
Phone: +1 604 647 1527<tel:%2B1%20604%20647%201527>

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com<http://www.cisco.com/>





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000<tel:416-306-7000>; Fax: 416-306-7099<tel:416-306-7099>. 
Preferences<http://www.cisco.com/offer/subscribe/?sid=000478326> - 
Unsubscribe<http://www.cisco.com/offer/unsubscribe/?sid=000478327> – 
Privacy<http://www.cisco.com/web/siteassets/legal/privacy.html>

From: Tathagata Das [mailto:t...@databricks.com<mailto:t...@databricks.com>]
Sent: Friday, February 27, 2015 12:29 PM
To: Nastooh Avessta (navesta)
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Race Condition in Streaming Thread

Are you sure the multiple invocations are not from previous runs of the program?

TD

On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) 
<nave...@cisco.com<mailto:nave...@cisco.com>> wrote:
Hi
Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd party udp 
traffic generator, from the streaming thread. The excerpt is as follows:
…
   do{

                 try {

                 p = Runtime.getRuntime().exec(Prog ");

                  socket.receive(packet);

          output.clear();
          kryo.writeObject(output, packet);
          store(output);
…
Program has a test to check for existing instantiation, e.g. [ "$(pidof Prog)" 
] && exit.  This code runs fine, i.e., 3rd party application is invoked, data 
is received, analyzed on driver, etc. Problem arises, when I test redundancy 
and fault-tolerance. Specifically, when I manually terminate Prog, upon 
recovery,  multiple invocations are observed. This could be due to multiple 
threads getting through  [ "$(pidof Prog)" ] && exit. However, I was hoping by 
adding semaphores, as follows, to avoid this problem:
…
   do{

                 try {
                                                sem.acquire();
                                                  p = 
Runtime.getRuntime().exec("Prog");
                                                 sem.release();
                                }catch(IOException ioe)                {
                                                      //ioe.printStackTrace();
                                                      break;
                                                }
                  socket.receive(packet);
                  //InetAddress returnIPAddress = packet.getAddress(); 
returnPort = packet.getPort();
          output.clear();
          kryo.writeObject(output, packet);
          store(output);
…

However, I am still seeing multiple invocations of Prog, upon recovery. I have 
also experimented with the following configuration parameters, to no avail:
  sparkConf.set("spark.cores.max", args[1]);
  sparkConf.set("spark.task.cpus", args[2]);
 sparkConf.set("spark.default.parallelism",args[2]);
with args={(1,1),(2,1), (1,2),…}
Any thoughts?
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com<mailto:nave...@cisco.com>
Phone: +1 604 647 1527<tel:%2B1%20604%20647%201527>

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com<http://www.cisco.com/>





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000<tel:416-306-7000>; Fax: 416-306-7099<tel:416-306-7099>. 
Preferences<http://www.cisco.com/offer/subscribe/?sid=000478326> - 
Unsubscribe<http://www.cisco.com/offer/unsubscribe/?sid=000478327> – 
Privacy<http://www.cisco.com/web/siteassets/legal/privacy.html>



Reply via email to