Since the refactor, use this command to run/test:
  cargo r --example eth_poll

Signed-off-by: Harry van Haaren <harry.van.haa...@intel.com>
---
 rust_api_example/examples/eth_poll.rs | 45 ++++++++++++++++++++---
 rust_api_example/src/lib.rs           | 52 ++++++++++++++++++++++++---
 2 files changed, 88 insertions(+), 9 deletions(-)

diff --git a/rust_api_example/examples/eth_poll.rs 
b/rust_api_example/examples/eth_poll.rs
index cde28df68d..0ef0a28ab9 100644
--- a/rust_api_example/examples/eth_poll.rs
+++ b/rust_api_example/examples/eth_poll.rs
@@ -10,7 +10,7 @@ fn main() {
     let mut ports = dpdk.take_eth_ports().expect("take eth ports ok");
     let mut p = ports.pop().unwrap();
 
-    p.rxqs(2, rx_mempool).expect("rxqs setup ok");
+    p.rxqs(2, rx_mempool.clone()).expect("rxqs setup ok");
     println!("{:?}", p);
 
     let (mut rxqs, _txqs) = p.start();
@@ -21,15 +21,50 @@ fn main() {
 
     std::thread::spawn(move || {
         let mut rxq = rxq1.enable_polling();
-        loop {
+        for _ in 0..3 {
             let _nb_mbufs = rxq.rx_burst(&mut [0; 32]);
             std::thread::sleep(std::time::Duration::from_millis(1000));
         }
     });
 
-    let mut rxq = rxq2.enable_polling();
-    loop {
-        let _nb_mbufs = rxq.rx_burst(&mut [0; 32]);
+    // "shadowing" variables is a common pattern in Rust, and is used here to
+    // allow us to use the same variable name but for Rxq instead of RxqHandle.
+    let mut rxq2 = rxq2.enable_polling();
+    for _ in 0..2 {
+        let _nb_mbufs = rxq2.rx_burst(&mut [0; 32]);
         std::thread::sleep(std::time::Duration::from_millis(1000));
     }
+
+    // Important! As Port::stop() relies on RxqHandle's being dropped to
+    // reduce the refcount, if the rxq is NOT dropped, it will NOT allow
+    // the port to be stopped. This is actually a win for Safety (no polling 
stopped NIC ports)
+    // but also a potential bug/hiccup at application code level.
+    // Uncomment this line to see the loop below stall forever (waiting for 
Arc ref count to drop from 2 to 1)
+    drop(rxq2);
+
+    loop {
+        let r = p.stop();
+        match r {
+            Ok(_v) => {
+                println!("stopping port");
+                break;
+            }
+            Err(e) => {
+                println!("stop() returns error: {}", e);
+            }
+        };
+        std::thread::sleep(std::time::Duration::from_millis(300));
+    }
+
+    // Reconfigure after stop()
+    p.rxqs(4, rx_mempool.clone()).expect("rxqs setup ok");
+    println!("{:?}", p);
+
+    // queues is a tuple of (rxqs, txqs) here
+    let queues = p.start();
+    println!("queues: {:?}", queues);
+    drop(queues);
+
+    p.stop().expect("stop() ok");
+    println!("stopped port");
 }
\ No newline at end of file
diff --git a/rust_api_example/src/lib.rs b/rust_api_example/src/lib.rs
index 0d13b06d85..6b795fc227 100644
--- a/rust_api_example/src/lib.rs
+++ b/rust_api_example/src/lib.rs
@@ -5,20 +5,47 @@
 pub mod dpdk {
     pub mod eth {
         use super::Mempool;
-
+        use std::sync::Arc;
+
+        // PortHandle here is used as a refcount of "Outstanding Rx/Tx queues".
+        // This is useful, but the "runstate" of the port is also useful. They 
are
+        // similar, but not identical. A more elegant solution is likely 
possible.
+        #[derive(Debug, Clone)]
+        #[allow(unused)]
+        pub(crate) struct PortHandle(Arc<()>);
+
+        impl PortHandle {
+            fn new() -> Self {
+                PortHandle(Arc::new(()))
+            }
+            fn stop(&mut self) -> Result<(), usize> {
+                // if the count is 1, only the Port itself has a handle left.
+                // In that case, the count cannot go up, so we can stop.
+                // The strange "Arc::<()>::function()" syntax here is "Fully 
qualified syntax":
+                //  - 
https://doc.rust-lang.org/std/sync/struct.Arc.html#deref-behavior
+                let sc = Arc::<()>::strong_count(&self.0);
+                if  sc == 1 {
+                    Ok(())
+                } else {
+                    Err(sc)
+                }
+            }
+        }
+        
         #[derive(Debug)]
         pub struct TxqHandle {/* todo: but same as Rxq */}
 
         // Handle allows moving between threads, its not polling!
         #[derive(Debug)]
         pub struct RxqHandle {
+            _handle: PortHandle,
             port: u16,
             queue: u16,
         }
 
         impl RxqHandle {
-            pub(crate) fn new(port: u16, queue: u16) -> Self {
-                RxqHandle { port, queue }
+            pub(crate) fn new(handle: PortHandle, port: u16, queue: u16) -> 
Self {
+                RxqHandle { _handle: handle, port, queue }
             }
 
             // This function is the key to the API design: it ensures the 
rx_burst()
@@ -68,6 +95,7 @@ pub mod dpdk {
 
         #[derive(Debug)]
         pub struct Port {
+            handle: PortHandle,
             id: u16,
             rxqs: Vec<RxqHandle>,
             txqs: Vec<TxqHandle>,
@@ -77,6 +105,7 @@ pub mod dpdk {
             // pub(crate) here ensures outside this crate users cannot call 
this function
             pub(crate) fn from_u16(id: u16) -> Self {
                 Port {
+                    handle: PortHandle::new(),
                     id,
                     rxqs: Vec::new(),
                     txqs: Vec::new(),
@@ -84,10 +113,14 @@ pub mod dpdk {
             }
 
             pub fn rxqs(&mut self, rxq_count: u16, _mempool: Mempool) -> 
Result<(), String> {
+                // ensure no old ports remain
+                self.rxqs.clear();
+
                 for q in 0..rxq_count {
                     // call rte_eth_rx_queue_setup() here
-                    self.rxqs.push(RxqHandle::new(self.id, q));
+                    self.rxqs.push(RxqHandle::new(self.handle.clone(), 
self.id, q));
                 }
+                println!("{:?}", self.handle);
                 Ok(())
             }
 
@@ -98,6 +131,17 @@ pub mod dpdk {
                     std::mem::take(&mut self.txqs),
                 )
             }
+
+            pub fn stop(&mut self) -> Result<(), String> {
+                match self.handle.stop() {
+                    Ok(_v) => {
+                        // call rte_eth_dev_stop() here
+                        println!("stopping port {}", self.id);
+                        Ok(())
+                    }
+                    Err(e) => Err(format!("Port has {} Rxq/Txq handles 
outstanding", e)),
+                }
+            }
         }
     }
 
-- 
2.34.1

Reply via email to