Allow the watcher binary to run even when there is no DPDK process
running. In that case, wait for a suitable process to start and begin
monitoring then. In case of disconnection, keep trying to reconnect and
resume once reconnection succeeds.

Signed-off-by: Bruce Richardson <[email protected]>
Acked-by: Stephen Hemminger <[email protected]>
---
 doc/guides/rel_notes/release_26_07.rst |   7 ++
 doc/guides/tools/telemetrywatcher.rst  |   6 +
 usertools/dpdk-telemetry-watcher.py    | 166 +++++++++++++++++++------
 3 files changed, 141 insertions(+), 38 deletions(-)

diff --git a/doc/guides/rel_notes/release_26_07.rst 
b/doc/guides/rel_notes/release_26_07.rst
index f012d47a4b..a049bacfe0 100644
--- a/doc/guides/rel_notes/release_26_07.rst
+++ b/doc/guides/rel_notes/release_26_07.rst
@@ -63,6 +63,13 @@ New Features
     ``rte_eal_init`` and the application is responsible for probing each 
device,
   * ``--auto-probing`` enables the initial bus probing, which is the current 
default behavior.

+* **Added Script for Real-time Telemetry Monitoring.**
+
+  Introduced the `dpdk-telemetry-watcher.py` script, enabling users to monitor
+  real-time telemetry statistics from running DPDK applications.
+  The tool supports customizable display options, including delta values,
+  total statistics, and single-line output for compact monitoring.
+

 Removed Items
 -------------
diff --git a/doc/guides/tools/telemetrywatcher.rst 
b/doc/guides/tools/telemetrywatcher.rst
index 251d99a085..f3be49982f 100644
--- a/doc/guides/tools/telemetrywatcher.rst
+++ b/doc/guides/tools/telemetrywatcher.rst
@@ -11,6 +11,12 @@ It wraps the ``dpdk-telemetry.py`` script to provide 
real-time statistics displa
 Running the Tool
 ----------------

+The watcher tool can be run at any time, whether or not a DPDK application is 
currently running.
+When a DPDK application with telemetry enabled starts
+(assuming correct file-prefix and instance are specified),
+the watcher will automatically connect and begin displaying the requested 
statistics.
+If the DPDK application stops, the watcher will attempt to reconnect when the 
application restarts.
+
 The tool has a number of command line options:

 .. code-block:: console
diff --git a/usertools/dpdk-telemetry-watcher.py 
b/usertools/dpdk-telemetry-watcher.py
index eda57e5ba5..51b5736381 100755
--- a/usertools/dpdk-telemetry-watcher.py
+++ b/usertools/dpdk-telemetry-watcher.py
@@ -60,6 +60,29 @@ def find_telemetry_script():
     return telemetry_script


+def cleanup_telemetry_process(process):
+    """Close pipes and terminate/wait for a telemetry subprocess.
+
+    Args:
+        process (subprocess.Popen): Telemetry subprocess to clean up.
+    """
+    if process is None:
+        return
+
+    for stream in (process.stdin, process.stdout, process.stderr):
+        stream.close()
+
+    if process.poll() is None:
+        process.terminate()
+        try:
+            process.wait(timeout=1)
+        except subprocess.TimeoutExpired:
+            process.kill()
+            process.wait()
+    else:
+        process.wait()
+
+
 def create_telemetry_process(telemetry_script, args_list):
     """Create a subprocess for dpdk-telemetry.py with pipes.

@@ -85,6 +108,8 @@ def create_telemetry_process(telemetry_script, args_list):
             text=True,
             bufsize=1,  # Line buffered
         )
+        process.script = telemetry_script  # Store script path for reference
+        process.args = args_list  # Store args for reference
         return process
     except FileNotFoundError:
         print("Error: Python interpreter or script not found", file=sys.stderr)
@@ -102,15 +127,42 @@ def query_telemetry(process, command):
         command: The telemetry command to send (e.g., "/info" or 
"/ethdev/stats,0")

     Returns:
-        dict: The parsed JSON response with the command wrapper stripped,
+        (process, dict): The process handle, in case of reconnection, and the
+              parsed JSON response with the command wrapper stripped,
               or None if there was an error
     """
-    # Send the command
-    process.stdin.write(f"{command}\n")
-    process.stdin.flush()
+    # Handle case where process is None
+    if process is None:
+        return (None, None)
+
+    # Send/read may fail with broken pipes if the app dies; reconnect on 
failure.
+    try:
+        process.stdin.write(f"{command}\n")
+        process.stdin.flush()
+        response = process.stdout.readline()
+    except (BrokenPipeError, OSError):
+        response = None
+
+    # Reconnect and retry until a non-empty response is received.
+    while not response:
+        script = process.script
+        args_list = process.args
+        cleanup_telemetry_process(process)
+        process = None
+        print("Application disconnected, retrying...", file=sys.stderr)
+        while not process:
+            time.sleep(1)
+            candidate = create_telemetry_process(script, args_list)
+            process = print_connected_app(candidate)
+            if not process:
+                cleanup_telemetry_process(candidate)
+        try:
+            process.stdin.write(f"{command}\n")
+            process.stdin.flush()
+            response = process.stdout.readline()
+        except (BrokenPipeError, OSError):
+            response = None

-    # Read the JSON response
-    response = process.stdout.readline()
     try:
         data = json.loads(response)
         # When run non-interactively, the response is wrapped with the command
@@ -119,24 +171,47 @@ def query_telemetry(process, command):
         # The response should have exactly one key which is the command
         if len(data) == 1:
             # Extract the value, ignoring the key
-            return next(iter(data.values()))
+            return (process, next(iter(data.values())))
         else:
-            return data
+            return (process, data)
     except (json.JSONDecodeError, KeyError):
-        return None
+        return (process, None)


 def print_connected_app(process):
     """Query and print the name of the connected DPDK application.

+    This helper sends /info directly instead of using query_telemetry()
+    to avoid a recursive reconnect call chain when it is used during
+    process creation/reconnection.
+
     Args:
         process: The subprocess.Popen handle to the telemetry process
     """
-    info = query_telemetry(process, "/info")
+    try:
+        process.stdin.write("/info\n")
+        process.stdin.flush()
+        response = process.stdout.readline()
+    except (BrokenPipeError, OSError):
+        return None
+
+    if not response:
+        return None
+
+    try:
+        data = json.loads(response)
+        if len(data) == 1:
+            info = next(iter(data.values()))
+        else:
+            info = data
+    except (json.JSONDecodeError, KeyError):
+        return None
+
     if info and "pid" in info:
         app_name = get_app_name(info["pid"])
         if app_name:
             print(f'Connected to application: "{app_name}"')
+    return process


 def expand_shortcuts(process, stat_specs):
@@ -147,7 +222,10 @@ def expand_shortcuts(process, stat_specs):
         stat_specs: List of stat specifications, possibly including shortcuts

     Returns:
-        List of expanded stat specifications
+        Tuple of (process, expanded_specs) where:
+            process: Updated process handle, in case of reconnection
+            expanded_specs: List of expanded stat specifications
+        Returns (process, None) on error
     """
     expanded = []
     for spec in stat_specs:
@@ -159,7 +237,7 @@ def expand_shortcuts(process, stat_specs):
         field = spec[4:]  # Remove "eth." prefix
         if not field:
             print(f"Error: Invalid shortcut '{spec}' - missing field name", 
file=sys.stderr)
-            return None
+            return process, None

         # Map common shortcuts to actual field names
         field_map = {
@@ -169,16 +247,16 @@ def expand_shortcuts(process, stat_specs):
         field = field_map.get(field, field)

         # Get list of ethernet devices
-        port_list = query_telemetry(process, "/ethdev/list")
+        process, port_list = query_telemetry(process, "/ethdev/list")
         if not isinstance(port_list, list):
             print("Error: Failed to get ethernet device list", file=sys.stderr)
-            return None
+            return process, None

         # Create stat specs for each port
         for port in port_list:
             expanded.append(f"/ethdev/stats,{port}.{field}")

-    return expanded
+    return process, expanded


 def validate_stats(process, stat_specs):
@@ -189,10 +267,11 @@ def validate_stats(process, stat_specs):
         stat_specs: List of stat specifications in format "command.field"

     Returns:
-        Tuple of (parsed_specs, initial_values) where:
+        Tuple of (process, parsed_specs, initial_values) where:
+            process: Updated process handle, in case of reconnection
             parsed_specs: List of tuples (spec, command, field) for valid specs
             initial_values: List of initial values for each stat
-        Returns (None, None) on error
+        Returns (process, None, None) on error
     """
     parsed_specs = []
     initial_values = []
@@ -204,7 +283,7 @@ def validate_stats(process, stat_specs):
                 "Expected format: 'command.field' (e.g., 
/ethdev/stats,0.ipackets)",
                 file=sys.stderr,
             )
-            return None, None
+            return process, None, None

         command, field = spec.rsplit(".", 1)
         if not command or not field:
@@ -213,28 +292,28 @@ def validate_stats(process, stat_specs):
                 "Expected format: 'command.field' (e.g., 
/ethdev/stats,0.ipackets)",
                 file=sys.stderr,
             )
-            return None, None
+            return process, None, None

         # Query the stat once to validate it exists and is numeric
-        data = query_telemetry(process, command)
+        process, data = query_telemetry(process, command)
         if not isinstance(data, dict):
             print(f"Error: Command '{command}' did not return a dictionary", 
file=sys.stderr)
-            return None, None
+            return process, None, None
         if field not in data:
             print(f"Error: Field '{field}' not found in '{command}' response", 
file=sys.stderr)
-            return None, None
+            return process, None, None
         value = data[field]
         if not isinstance(value, (int, float)):
             print(
                 f"Error: Field '{field}' in '{command}' is not numeric (got 
{type(value).__name__})",
                 file=sys.stderr,
             )
-            return None, None
+            return process, None, None

         parsed_specs.append((spec, command, field))
         initial_values.append(value)

-    return parsed_specs, initial_values
+    return process, parsed_specs, initial_values


 def monitor_stats(process, args):
@@ -243,16 +322,19 @@ def monitor_stats(process, args):
     Args:
         process: The subprocess.Popen handle to the telemetry process
         args: Parsed command line arguments
+
+    Returns:
+        subprocess.Popen: The latest telemetry process handle
     """
     # Expand any shortcuts like eth-rx, eth-tx
-    expanded_stats = expand_shortcuts(process, args.stats)
+    process, expanded_stats = expand_shortcuts(process, args.stats)
     if not expanded_stats:
-        return
+        return process

     # Validate all stat specifications and get initial values
-    parsed_specs, prev_values = validate_stats(process, expanded_stats)
+    process, parsed_specs, prev_values = validate_stats(process, 
expanded_stats)
     if not parsed_specs:
-        return
+        return process

     # Print header
     header = "Time".ljust(10)
@@ -276,7 +358,12 @@ def monitor_stats(process, args):
             current_values = []
             total = 0
             for i, (spec, command, field) in enumerate(parsed_specs):
-                data = query_telemetry(process, command)
+                process, data = query_telemetry(process, command)
+                if not data:
+                    fallback_value = prev_values[i] if i < len(prev_values) 
else 0
+                    current_values.append(fallback_value)
+                    row += "N/A".rjust(25)
+                    continue
                 current_value = data[field]
                 current_values.append(current_value)

@@ -298,6 +385,8 @@ def monitor_stats(process, args):
             print()  # Add newline before exit message
         print("\nMonitoring stopped")

+    return process
+

 def main():
     """Main function to parse arguments and run dpdk-telemetry.py with a 
pipe"""
@@ -386,19 +475,20 @@ def main():
         return 1

     # Run dpdk-telemetry.py with pipes for stdin and stdout
-    process = create_telemetry_process(telemetry_script, args_list)
-
-    # Get and display the connected application name
-    print_connected_app(process)
+    process = None
+    print("Waiting for connection to DPDK application...", file=sys.stderr)
+    while not process:
+        candidate = create_telemetry_process(telemetry_script, args_list)
+        process = print_connected_app(candidate)
+        if not process:
+            cleanup_telemetry_process(candidate)
+            time.sleep(1)

     # Monitor the requested statistics
-    monitor_stats(process, args)
+    process = monitor_stats(process, args)

     # Clean up
-    process.stdin.close()
-    process.stdout.close()
-    process.stderr.close()
-    process.wait()
+    cleanup_telemetry_process(process)

     return 0

--
2.51.0

Reply via email to