Copilot commented on code in PR #17851:
URL: https://github.com/apache/pinot/pull/17851#discussion_r2916818452


##########
pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java:
##########
@@ -157,66 +148,177 @@ public void noOpWhenNewConfigIsEqual() {
     Assertions.assertThat(_continuousJfrStarter.isRunning())
         .describedAs("Recording should still be disabled")
         .isFalse();
-    Mockito.verifyNoInteractions(_recording);
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).isEmpty();
+  }
+
+  @Test
+  public void restartsWhenConfigurationChanges() {
+    Map<String, String> config = Map.of(
+        "pinot.jfr.enabled", "true",
+        "pinot.jfr.maxAge", "PT2H");
+    _continuousJfrStarter.onChange(Set.of(), config);
+
+    Set<String> changed = Set.of("pinot.jfr.maxAge");
+    Map<String, String> updatedConfig = Map.of(
+        "pinot.jfr.enabled", "true",
+        "pinot.jfr.maxAge", "PT1H");
+    _continuousJfrStarter.onChange(changed, updatedConfig);
+
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=7200000ms",
+        "jfrStop name=pinot-continuous",
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=3600000ms");
+  }
+
+  @Test
+  public void configuresRepositoryAndDumpPathsViaMBean() {
+    Map<String, String> config = Map.of(
+        "pinot.jfr.enabled", "true",
+        "pinot.jfr.directory", "/var/log/pinot/jfr-repository",
+        "pinot.jfr.dumpPath", "/var/log/pinot/jfr-dumps");
+    _continuousJfrStarter.onChange(Set.of(), config);
+
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+        "jfrConfigure repositorypath=/var/log/pinot/jfr-repository 
dumppath=/var/log/pinot/jfr-dumps",
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=86400000ms");
+  }
+
+  @Test
+  public void doesNotFailWhenMBeanIsUnavailable() {
+    _continuousJfrStarter.setMBeanAvailable(false);
+    Map<String, String> config = Map.of("pinot.jfr.enabled", "true");
+    _continuousJfrStarter.onChange(Set.of(), config);
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should remain disabled when MBean is 
unavailable")
+        .isFalse();
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).isEmpty();
+  }
+
+  @Test
+  public void keepsRunningWhenStopCommandFails() {
+    _continuousJfrStarter.onChange(Set.of(), Map.of("pinot.jfr.enabled", 
"true"));
+    _continuousJfrStarter.failCommand("jfrStop");
+
+    _continuousJfrStarter.onChange(Set.of("pinot.jfr.enabled"), 
Map.of("pinot.jfr.enabled", "false"));
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Starter should keep running state when stop fails")
+        .isTrue();
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=86400000ms",
+        "jfrStop name=pinot-continuous");
+  }
+
+  @Test
+  public void keepsRunningWhenMBeanBecomesUnavailableOnStop() {
+    _continuousJfrStarter.onChange(Set.of(), Map.of("pinot.jfr.enabled", 
"true"));
+    _continuousJfrStarter.setMBeanAvailable(false);
+
+    _continuousJfrStarter.onChange(Set.of("pinot.jfr.enabled"), 
Map.of("pinot.jfr.enabled", "false"));
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Starter should keep running state when MBean is 
unavailable for stop")
+        .isTrue();
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=86400000ms");
   }
 
   @Test
-  public void cleanUpThreadDeletesFiles()
-      throws IOException {
-    Path tempDirectory = Files.createTempDirectory("jfr-test-");
-    int maxDumps = 3;
-    long now = ZonedDateTime.of(2025, 10, 13, 12, 0, 0, 0, 
ZoneOffset.UTC).toInstant().toEpochMilli();
+  public void integrationTestWithRealDiagnosticCommandMBean() {
+    ContinuousJfrStarter starter = new ContinuousJfrStarter();
+    if (!starter.isDiagnosticCommandAvailable()) {
+      throw new SkipException("JFR DiagnosticCommand MBean is not available in 
this runtime");
+    }
+
+    String recordingName = "pinot-continuous-it-" + System.currentTimeMillis();
+    Map<String, String> enabledConfig = Map.of(
+        "pinot.jfr.enabled", "true",
+        "pinot.jfr.name", recordingName,
+        "pinot.jfr.toDisk", "false");
+    Map<String, String> disabledConfig = Map.of(
+        "pinot.jfr.enabled", "false",
+        "pinot.jfr.name", recordingName,
+        "pinot.jfr.toDisk", "false");

Review Comment:
   The real-MBean integration test only exercises `toDisk=false`, so it won't 
catch incompatibilities in generated arguments like `maxage`/`maxsize` (which 
are only sent when `toDisk=true`). Consider adding an additional integration 
assertion that starts a recording with `toDisk=true` and a non-default `maxAge` 
to validate that `toJfrTimeArgument(...)` produces a value accepted by the 
DiagnosticCommand MBean.



##########
pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java:
##########
@@ -155,157 +159,137 @@ public void onChange(Set<String> changedConfigs, 
Map<String, String> clusterConf
         return;
       }
 
-      stopRecording();
-      _currentConfig = newSubsetMap;
-      startRecording(subset);
+      if (!stopRecording()) {
+        LOGGER.warn("Failed to stop existing continuous JFR recording. 
Skipping config update");
+        return;
+      }
+      if (startRecording(subset)) {
+        _currentConfig = newSubsetMap;
+      } else {
+        LOGGER.warn("Failed to apply continuous JFR config update");
+      }
     }
   }
 
   public boolean isRunning() {
     return _running;
   }
 
-  private void stopRecording() {
+  private boolean stopRecording() {
     if (!_running) {
-      return;
+      return true;
     }
-    assert _recording != null;
-    LOGGER.debug("Stopping recording {}", _recording.getName());
-    _recording.stop();
-    _recording.close();
-
-    if (_cleanupThread != null) {
-      _cleanupThread.interrupt();
-      try {
-        _cleanupThread.join(5_000);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.warn("Interrupted while waiting for cleanup thread to stop");
-      }
-      _cleanupThread = null;
+    assert _recordingName != null;
+    if (!isDiagnosticCommandAvailable()) {
+      LOGGER.warn("JFR DiagnosticCommand MBean is unavailable. Cannot stop 
continuous JFR recording '{}'",
+          _recordingName);
+      return false;
     }
-
-    LOGGER.info("Stopped continuous JFR recording {}", _recording.getName());
-    _recording = null;
+    if (!executeDiagnosticCommand(JFR_STOP_COMMAND, "name=" + _recordingName)) 
{
+      LOGGER.warn("Failed to stop continuous JFR recording '{}'", 
_recordingName);
+      return false;
+    }
+    LOGGER.info("Stopped continuous JFR recording {}", _recordingName);
+    _recordingName = null;
     _running = false;
+    return true;
   }
 
-  private void startRecording(PinotConfiguration subset) {
+  private boolean startRecording(PinotConfiguration subset) {
     if (!subset.getProperty(ENABLED, DEFAULT_ENABLED)) {
       LOGGER.info("Continuous JFR recording is disabled");
-      return;
+      return true;
     }
     if (_running) {
-      return;
+      return true;
+    }
+    if (!isDiagnosticCommandAvailable()) {
+      LOGGER.warn("JFR DiagnosticCommand MBean is unavailable. Cannot start 
continuous JFR recording");
+      return false;
     }
 
-    _recording = createRecording(subset);
-    _recording.setName(subset.getProperty(NAME, DEFAULT_NAME));
-
-    _recording.setDumpOnExit(subset.getProperty(DUMP_ON_EXIT, 
DEFAULT_DUMP_ON_EXIT));
-
-    prepareFileDumps(subset);
+    String recordingName = subset.getProperty(NAME, DEFAULT_NAME);
+    if (!applyRuntimeOptions(subset)) {
+      LOGGER.warn("Failed to apply JFR runtime options for recording '{}'", 
recordingName);
+      return false;
+    }
 
+    String maxAge = subset.getProperty(MAX_AGE, DEFAULT_MAX_AGE);
     try {
+      List<String> startArguments = new ArrayList<>();
+      startArguments.add("name=" + recordingName);
+      startArguments.add("settings=" + subset.getProperty(CONFIGURATION, 
DEFAULT_CONFIGURATION));
+      startArguments.add("dumponexit=" + subset.getProperty(DUMP_ON_EXIT, 
DEFAULT_DUMP_ON_EXIT));
       boolean toDisk = subset.getProperty(TO_DISK, DEFAULT_TO_DISK);
+      startArguments.add("disk=" + toDisk);
       if (toDisk) {
-        _recording.setToDisk(true);
-        _recording.setMaxSize(subset.getProperty(MAX_SIZE, DEFAULT_MAX_SIZE));
-        _recording.setMaxAge(Duration.parse(subset.getProperty(MAX_AGE, 
DEFAULT_MAX_AGE).toUpperCase(Locale.ENGLISH)));
+        startArguments.add("maxsize=" + subset.getProperty(MAX_SIZE, 
DEFAULT_MAX_SIZE));
+        startArguments.add("maxage=" + toJfrTimeArgument(maxAge));
+      }
+      if (!executeDiagnosticCommand(JFR_START_COMMAND, 
startArguments.toArray(new String[0]))) {
+        LOGGER.warn("Failed to start continuous JFR recording '{}'", 
recordingName);
+        return false;
       }
     } catch (DateTimeParseException e) {
-      throw new RuntimeException("Failed to parse duration", e);
+      throw new RuntimeException("Failed to parse duration '" + maxAge + "'", 
e);
     }
-    _recording.start();
-    LOGGER.info("Started continuous JFR recording {} with configuration: {}", 
_recording.getName(), subset);
+
+    _recordingName = recordingName;
+    LOGGER.info("Started continuous JFR recording {} with configuration: {}", 
recordingName, subset);
     _running = true;
+    return true;
   }
 
   @VisibleForTesting
-  protected static Path getRecordingPath(Path parentDir, String name, Instant 
timestamp) {
-    String filename = "recording-" + name + "-" + timestamp + ".jfr";
-    return parentDir.resolve(filename);
-  }
-
-  private void prepareFileDumps(PinotConfiguration subset) {
+  protected boolean executeDiagnosticCommand(String operationName, String... 
arguments) {
     try {
-      Path directory = Path.of(subset.getProperty(DIRECTORY, 
Paths.get(".").toString()));
-      if (!directory.toFile().canWrite()) {
-        throw new RuntimeException("Cannot write: " + directory);
-      }
+      _mBeanServer.invoke(_diagnosticCommandObjectName, operationName, new 
Object[]{arguments},
+          new String[]{String[].class.getName()});
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn("Failed to execute JFR command '{}' with arguments {}", 
operationName, Arrays.toString(arguments), e);
+      return false;
+    }

Review Comment:
   `executeDiagnosticCommand()` can be invoked with a null 
`_diagnosticCommandObjectName` (e.g., if `createDiagnosticCommandObjectName()` 
returned null) and will then rely on catching a resulting exception. Adding an 
explicit null/availability guard here (and logging a clearer one-line message) 
would avoid noisy stack traces and make the failure mode clearer for 
callers/subclasses.



##########
pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java:
##########
@@ -157,66 +148,177 @@ public void noOpWhenNewConfigIsEqual() {
     Assertions.assertThat(_continuousJfrStarter.isRunning())
         .describedAs("Recording should still be disabled")
         .isFalse();
-    Mockito.verifyNoInteractions(_recording);
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).isEmpty();
+  }
+
+  @Test
+  public void restartsWhenConfigurationChanges() {
+    Map<String, String> config = Map.of(
+        "pinot.jfr.enabled", "true",
+        "pinot.jfr.maxAge", "PT2H");
+    _continuousJfrStarter.onChange(Set.of(), config);
+
+    Set<String> changed = Set.of("pinot.jfr.maxAge");
+    Map<String, String> updatedConfig = Map.of(
+        "pinot.jfr.enabled", "true",
+        "pinot.jfr.maxAge", "PT1H");
+    _continuousJfrStarter.onChange(changed, updatedConfig);
+
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=7200000ms",
+        "jfrStop name=pinot-continuous",
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=3600000ms");
+  }
+
+  @Test
+  public void configuresRepositoryAndDumpPathsViaMBean() {
+    Map<String, String> config = Map.of(
+        "pinot.jfr.enabled", "true",
+        "pinot.jfr.directory", "/var/log/pinot/jfr-repository",
+        "pinot.jfr.dumpPath", "/var/log/pinot/jfr-dumps");
+    _continuousJfrStarter.onChange(Set.of(), config);
+
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+        "jfrConfigure repositorypath=/var/log/pinot/jfr-repository 
dumppath=/var/log/pinot/jfr-dumps",
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=86400000ms");
+  }
+
+  @Test
+  public void doesNotFailWhenMBeanIsUnavailable() {
+    _continuousJfrStarter.setMBeanAvailable(false);
+    Map<String, String> config = Map.of("pinot.jfr.enabled", "true");
+    _continuousJfrStarter.onChange(Set.of(), config);
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should remain disabled when MBean is 
unavailable")
+        .isFalse();
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).isEmpty();
+  }
+
+  @Test
+  public void keepsRunningWhenStopCommandFails() {
+    _continuousJfrStarter.onChange(Set.of(), Map.of("pinot.jfr.enabled", 
"true"));
+    _continuousJfrStarter.failCommand("jfrStop");
+
+    _continuousJfrStarter.onChange(Set.of("pinot.jfr.enabled"), 
Map.of("pinot.jfr.enabled", "false"));
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Starter should keep running state when stop fails")
+        .isTrue();
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=86400000ms",
+        "jfrStop name=pinot-continuous");
+  }
+
+  @Test
+  public void keepsRunningWhenMBeanBecomesUnavailableOnStop() {
+    _continuousJfrStarter.onChange(Set.of(), Map.of("pinot.jfr.enabled", 
"true"));
+    _continuousJfrStarter.setMBeanAvailable(false);
+
+    _continuousJfrStarter.onChange(Set.of("pinot.jfr.enabled"), 
Map.of("pinot.jfr.enabled", "false"));
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Starter should keep running state when MBean is 
unavailable for stop")
+        .isTrue();
+    
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+        "jfrStart name=pinot-continuous settings=default dumponexit=true 
disk=true maxsize=209715200 maxage=86400000ms");
   }
 
   @Test
-  public void cleanUpThreadDeletesFiles()
-      throws IOException {
-    Path tempDirectory = Files.createTempDirectory("jfr-test-");
-    int maxDumps = 3;
-    long now = ZonedDateTime.of(2025, 10, 13, 12, 0, 0, 0, 
ZoneOffset.UTC).toInstant().toEpochMilli();
+  public void integrationTestWithRealDiagnosticCommandMBean() {
+    ContinuousJfrStarter starter = new ContinuousJfrStarter();
+    if (!starter.isDiagnosticCommandAvailable()) {
+      throw new SkipException("JFR DiagnosticCommand MBean is not available in 
this runtime");
+    }
+
+    String recordingName = "pinot-continuous-it-" + System.currentTimeMillis();
+    Map<String, String> enabledConfig = Map.of(
+        "pinot.jfr.enabled", "true",
+        "pinot.jfr.name", recordingName,
+        "pinot.jfr.toDisk", "false");
+    Map<String, String> disabledConfig = Map.of(
+        "pinot.jfr.enabled", "false",
+        "pinot.jfr.name", recordingName,
+        "pinot.jfr.toDisk", "false");
+
     try {
-      long[] dates = IntStream.range(0, maxDumps * 2)
-          .mapToLong(i -> now - i * 3600_000L)
-          .sorted()
-          .toArray();
-      for (long creationDate : dates) {
-        Path path = ContinuousJfrStarter.getRecordingPath(tempDirectory, 
"test", Instant.ofEpochMilli(creationDate));
-        File file = path.toFile();
-        Assertions.assertThat(file.createNewFile())
-            .describedAs("Should be able to create a file in the temp 
directory")
-            .isTrue();
-        Assertions.assertThat(file.setLastModified(creationDate))
-            .describedAs("Should be able to set the last modified time")
-            .isTrue();
-      }
+      starter.onChange(Set.of(), enabledConfig);
+      Assertions.assertThat(starter.isRunning()).isTrue();
+      Assertions.assertThat(waitForRecordingPresence(recordingName, true, 10, 
100))
+          .describedAs("Recording should be visible via JFR.check")
+          .isTrue();
 
-      // Verify that we have 2 * maxDumps files
-      try (var files = Files.list(tempDirectory)) {
-        Assertions.assertThat(files.count())
-            .describedAs("Should have 2 * maxDumps files in the temp 
directory")
-            .isEqualTo(maxDumps * 2);
+      starter.onChange(Set.of("pinot.jfr.enabled"), disabledConfig);
+      Assertions.assertThat(starter.isRunning()).isFalse();
+      Assertions.assertThat(waitForRecordingPresence(recordingName, false, 10, 
100))
+          .describedAs("Recording should be absent via JFR.check after 
disable")
+          .isTrue();
+    } finally {
+      if (starter.isRunning()) {
+        starter.onChange(Set.of("pinot.jfr.enabled"), disabledConfig);
       }
+    }
+  }
 
-      // Run the cleanup
-      ContinuousJfrStarter.cleanUpDumps(tempDirectory, maxDumps, "test");
-
-      // Verify that we have maxDumps files and only the newest ones are kept
-      try (var files = Files.list(tempDirectory)) {
-        var remainingFiles = files.collect(Collectors.toSet());
-        Assertions.assertThat(remainingFiles)
-            .describedAs("Should have maxDumps files in the temp directory")
-            .hasSize(maxDumps);
-        for (int i = 0; i < maxDumps; i++) {
-          long creationDate = dates[dates.length - 1 - i];
-          Instant timestamp = Instant.ofEpochMilli(creationDate);
-          Path expectedPath = 
ContinuousJfrStarter.getRecordingPath(tempDirectory, "test", timestamp);
-          Assertions.assertThat(remainingFiles)
-              .describedAs("Should contain the expected file: %s", 
expectedPath)
-              .contains(expectedPath);
-        }
+  private static boolean waitForRecordingPresence(String recordingName, 
boolean expectedPresent, int maxAttempts,
+      long delayMs) {
+    for (int i = 0; i < maxAttempts; i++) {
+      boolean present = isRecordingPresent(recordingName);
+      if (present == expectedPresent) {
+        return true;
       }
+      try {
+        Thread.sleep(delayMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+    }
+    return false;

Review Comment:
   This integration test uses a custom `Thread.sleep` polling loop with a 1s 
total wait (10 * 100ms), which is prone to CI flakiness on slow or loaded 
machines. Prefer using the existing `TestUtils.waitForCondition(...)` helper 
(used elsewhere in pinot-core tests) with a more generous timeout and failure 
message.



##########
pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java:
##########
@@ -155,157 +159,137 @@ public void onChange(Set<String> changedConfigs, 
Map<String, String> clusterConf
         return;
       }
 
-      stopRecording();
-      _currentConfig = newSubsetMap;
-      startRecording(subset);
+      if (!stopRecording()) {
+        LOGGER.warn("Failed to stop existing continuous JFR recording. 
Skipping config update");
+        return;
+      }
+      if (startRecording(subset)) {
+        _currentConfig = newSubsetMap;
+      } else {
+        LOGGER.warn("Failed to apply continuous JFR config update");
+      }
     }
   }
 
   public boolean isRunning() {
     return _running;

Review Comment:
   `_running` is annotated `@GuardedBy("this")` but `isRunning()` reads it 
without synchronization/volatile, which breaks the stated thread-safety 
contract and can return stale values. Consider either synchronizing 
`isRunning()`, making `_running` `volatile`, or removing the `@GuardedBy` 
annotation if it isn't intended to be guarded.
   ```suggestion
       synchronized (this) {
         return _running;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to