Copilot commented on code in PR #17851:
URL: https://github.com/apache/pinot/pull/17851#discussion_r2916818452
##########
pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java:
##########
@@ -157,66 +148,177 @@ public void noOpWhenNewConfigIsEqual() {
Assertions.assertThat(_continuousJfrStarter.isRunning())
.describedAs("Recording should still be disabled")
.isFalse();
- Mockito.verifyNoInteractions(_recording);
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).isEmpty();
+ }
+
+ @Test
+ public void restartsWhenConfigurationChanges() {
+ Map<String, String> config = Map.of(
+ "pinot.jfr.enabled", "true",
+ "pinot.jfr.maxAge", "PT2H");
+ _continuousJfrStarter.onChange(Set.of(), config);
+
+ Set<String> changed = Set.of("pinot.jfr.maxAge");
+ Map<String, String> updatedConfig = Map.of(
+ "pinot.jfr.enabled", "true",
+ "pinot.jfr.maxAge", "PT1H");
+ _continuousJfrStarter.onChange(changed, updatedConfig);
+
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=7200000ms",
+ "jfrStop name=pinot-continuous",
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=3600000ms");
+ }
+
+ @Test
+ public void configuresRepositoryAndDumpPathsViaMBean() {
+ Map<String, String> config = Map.of(
+ "pinot.jfr.enabled", "true",
+ "pinot.jfr.directory", "/var/log/pinot/jfr-repository",
+ "pinot.jfr.dumpPath", "/var/log/pinot/jfr-dumps");
+ _continuousJfrStarter.onChange(Set.of(), config);
+
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+ "jfrConfigure repositorypath=/var/log/pinot/jfr-repository
dumppath=/var/log/pinot/jfr-dumps",
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=86400000ms");
+ }
+
+ @Test
+ public void doesNotFailWhenMBeanIsUnavailable() {
+ _continuousJfrStarter.setMBeanAvailable(false);
+ Map<String, String> config = Map.of("pinot.jfr.enabled", "true");
+ _continuousJfrStarter.onChange(Set.of(), config);
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should remain disabled when MBean is
unavailable")
+ .isFalse();
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).isEmpty();
+ }
+
+ @Test
+ public void keepsRunningWhenStopCommandFails() {
+ _continuousJfrStarter.onChange(Set.of(), Map.of("pinot.jfr.enabled",
"true"));
+ _continuousJfrStarter.failCommand("jfrStop");
+
+ _continuousJfrStarter.onChange(Set.of("pinot.jfr.enabled"),
Map.of("pinot.jfr.enabled", "false"));
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Starter should keep running state when stop fails")
+ .isTrue();
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=86400000ms",
+ "jfrStop name=pinot-continuous");
+ }
+
+ @Test
+ public void keepsRunningWhenMBeanBecomesUnavailableOnStop() {
+ _continuousJfrStarter.onChange(Set.of(), Map.of("pinot.jfr.enabled",
"true"));
+ _continuousJfrStarter.setMBeanAvailable(false);
+
+ _continuousJfrStarter.onChange(Set.of("pinot.jfr.enabled"),
Map.of("pinot.jfr.enabled", "false"));
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Starter should keep running state when MBean is
unavailable for stop")
+ .isTrue();
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=86400000ms");
}
@Test
- public void cleanUpThreadDeletesFiles()
- throws IOException {
- Path tempDirectory = Files.createTempDirectory("jfr-test-");
- int maxDumps = 3;
- long now = ZonedDateTime.of(2025, 10, 13, 12, 0, 0, 0,
ZoneOffset.UTC).toInstant().toEpochMilli();
+ public void integrationTestWithRealDiagnosticCommandMBean() {
+ ContinuousJfrStarter starter = new ContinuousJfrStarter();
+ if (!starter.isDiagnosticCommandAvailable()) {
+ throw new SkipException("JFR DiagnosticCommand MBean is not available in
this runtime");
+ }
+
+ String recordingName = "pinot-continuous-it-" + System.currentTimeMillis();
+ Map<String, String> enabledConfig = Map.of(
+ "pinot.jfr.enabled", "true",
+ "pinot.jfr.name", recordingName,
+ "pinot.jfr.toDisk", "false");
+ Map<String, String> disabledConfig = Map.of(
+ "pinot.jfr.enabled", "false",
+ "pinot.jfr.name", recordingName,
+ "pinot.jfr.toDisk", "false");
Review Comment:
The real-MBean integration test only exercises `toDisk=false`, so it won't
catch incompatibilities in generated arguments like `maxage`/`maxsize` (which
are only sent when `toDisk=true`). Consider adding an additional integration
assertion that starts a recording with `toDisk=true` and a non-default `maxAge`
to validate that `toJfrTimeArgument(...)` produces a value accepted by the
DiagnosticCommand MBean.
##########
pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java:
##########
@@ -155,157 +159,137 @@ public void onChange(Set<String> changedConfigs,
Map<String, String> clusterConf
return;
}
- stopRecording();
- _currentConfig = newSubsetMap;
- startRecording(subset);
+ if (!stopRecording()) {
+ LOGGER.warn("Failed to stop existing continuous JFR recording.
Skipping config update");
+ return;
+ }
+ if (startRecording(subset)) {
+ _currentConfig = newSubsetMap;
+ } else {
+ LOGGER.warn("Failed to apply continuous JFR config update");
+ }
}
}
public boolean isRunning() {
return _running;
}
- private void stopRecording() {
+ private boolean stopRecording() {
if (!_running) {
- return;
+ return true;
}
- assert _recording != null;
- LOGGER.debug("Stopping recording {}", _recording.getName());
- _recording.stop();
- _recording.close();
-
- if (_cleanupThread != null) {
- _cleanupThread.interrupt();
- try {
- _cleanupThread.join(5_000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Interrupted while waiting for cleanup thread to stop");
- }
- _cleanupThread = null;
+ assert _recordingName != null;
+ if (!isDiagnosticCommandAvailable()) {
+ LOGGER.warn("JFR DiagnosticCommand MBean is unavailable. Cannot stop
continuous JFR recording '{}'",
+ _recordingName);
+ return false;
}
-
- LOGGER.info("Stopped continuous JFR recording {}", _recording.getName());
- _recording = null;
+ if (!executeDiagnosticCommand(JFR_STOP_COMMAND, "name=" + _recordingName))
{
+ LOGGER.warn("Failed to stop continuous JFR recording '{}'",
_recordingName);
+ return false;
+ }
+ LOGGER.info("Stopped continuous JFR recording {}", _recordingName);
+ _recordingName = null;
_running = false;
+ return true;
}
- private void startRecording(PinotConfiguration subset) {
+ private boolean startRecording(PinotConfiguration subset) {
if (!subset.getProperty(ENABLED, DEFAULT_ENABLED)) {
LOGGER.info("Continuous JFR recording is disabled");
- return;
+ return true;
}
if (_running) {
- return;
+ return true;
+ }
+ if (!isDiagnosticCommandAvailable()) {
+ LOGGER.warn("JFR DiagnosticCommand MBean is unavailable. Cannot start
continuous JFR recording");
+ return false;
}
- _recording = createRecording(subset);
- _recording.setName(subset.getProperty(NAME, DEFAULT_NAME));
-
- _recording.setDumpOnExit(subset.getProperty(DUMP_ON_EXIT,
DEFAULT_DUMP_ON_EXIT));
-
- prepareFileDumps(subset);
+ String recordingName = subset.getProperty(NAME, DEFAULT_NAME);
+ if (!applyRuntimeOptions(subset)) {
+ LOGGER.warn("Failed to apply JFR runtime options for recording '{}'",
recordingName);
+ return false;
+ }
+ String maxAge = subset.getProperty(MAX_AGE, DEFAULT_MAX_AGE);
try {
+ List<String> startArguments = new ArrayList<>();
+ startArguments.add("name=" + recordingName);
+ startArguments.add("settings=" + subset.getProperty(CONFIGURATION,
DEFAULT_CONFIGURATION));
+ startArguments.add("dumponexit=" + subset.getProperty(DUMP_ON_EXIT,
DEFAULT_DUMP_ON_EXIT));
boolean toDisk = subset.getProperty(TO_DISK, DEFAULT_TO_DISK);
+ startArguments.add("disk=" + toDisk);
if (toDisk) {
- _recording.setToDisk(true);
- _recording.setMaxSize(subset.getProperty(MAX_SIZE, DEFAULT_MAX_SIZE));
- _recording.setMaxAge(Duration.parse(subset.getProperty(MAX_AGE,
DEFAULT_MAX_AGE).toUpperCase(Locale.ENGLISH)));
+ startArguments.add("maxsize=" + subset.getProperty(MAX_SIZE,
DEFAULT_MAX_SIZE));
+ startArguments.add("maxage=" + toJfrTimeArgument(maxAge));
+ }
+ if (!executeDiagnosticCommand(JFR_START_COMMAND,
startArguments.toArray(new String[0]))) {
+ LOGGER.warn("Failed to start continuous JFR recording '{}'",
recordingName);
+ return false;
}
} catch (DateTimeParseException e) {
- throw new RuntimeException("Failed to parse duration", e);
+ throw new RuntimeException("Failed to parse duration '" + maxAge + "'",
e);
}
- _recording.start();
- LOGGER.info("Started continuous JFR recording {} with configuration: {}",
_recording.getName(), subset);
+
+ _recordingName = recordingName;
+ LOGGER.info("Started continuous JFR recording {} with configuration: {}",
recordingName, subset);
_running = true;
+ return true;
}
@VisibleForTesting
- protected static Path getRecordingPath(Path parentDir, String name, Instant
timestamp) {
- String filename = "recording-" + name + "-" + timestamp + ".jfr";
- return parentDir.resolve(filename);
- }
-
- private void prepareFileDumps(PinotConfiguration subset) {
+ protected boolean executeDiagnosticCommand(String operationName, String...
arguments) {
try {
- Path directory = Path.of(subset.getProperty(DIRECTORY,
Paths.get(".").toString()));
- if (!directory.toFile().canWrite()) {
- throw new RuntimeException("Cannot write: " + directory);
- }
+ _mBeanServer.invoke(_diagnosticCommandObjectName, operationName, new
Object[]{arguments},
+ new String[]{String[].class.getName()});
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to execute JFR command '{}' with arguments {}",
operationName, Arrays.toString(arguments), e);
+ return false;
+ }
Review Comment:
`executeDiagnosticCommand()` can be invoked with a null
`_diagnosticCommandObjectName` (e.g., if `createDiagnosticCommandObjectName()`
returned null) and will then rely on catching a resulting exception. Adding an
explicit null/availability guard here (and logging a clearer one-line message)
would avoid noisy stack traces and make the failure mode clearer for
callers/subclasses.
##########
pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java:
##########
@@ -157,66 +148,177 @@ public void noOpWhenNewConfigIsEqual() {
Assertions.assertThat(_continuousJfrStarter.isRunning())
.describedAs("Recording should still be disabled")
.isFalse();
- Mockito.verifyNoInteractions(_recording);
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).isEmpty();
+ }
+
+ @Test
+ public void restartsWhenConfigurationChanges() {
+ Map<String, String> config = Map.of(
+ "pinot.jfr.enabled", "true",
+ "pinot.jfr.maxAge", "PT2H");
+ _continuousJfrStarter.onChange(Set.of(), config);
+
+ Set<String> changed = Set.of("pinot.jfr.maxAge");
+ Map<String, String> updatedConfig = Map.of(
+ "pinot.jfr.enabled", "true",
+ "pinot.jfr.maxAge", "PT1H");
+ _continuousJfrStarter.onChange(changed, updatedConfig);
+
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=7200000ms",
+ "jfrStop name=pinot-continuous",
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=3600000ms");
+ }
+
+ @Test
+ public void configuresRepositoryAndDumpPathsViaMBean() {
+ Map<String, String> config = Map.of(
+ "pinot.jfr.enabled", "true",
+ "pinot.jfr.directory", "/var/log/pinot/jfr-repository",
+ "pinot.jfr.dumpPath", "/var/log/pinot/jfr-dumps");
+ _continuousJfrStarter.onChange(Set.of(), config);
+
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+ "jfrConfigure repositorypath=/var/log/pinot/jfr-repository
dumppath=/var/log/pinot/jfr-dumps",
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=86400000ms");
+ }
+
+ @Test
+ public void doesNotFailWhenMBeanIsUnavailable() {
+ _continuousJfrStarter.setMBeanAvailable(false);
+ Map<String, String> config = Map.of("pinot.jfr.enabled", "true");
+ _continuousJfrStarter.onChange(Set.of(), config);
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should remain disabled when MBean is
unavailable")
+ .isFalse();
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).isEmpty();
+ }
+
+ @Test
+ public void keepsRunningWhenStopCommandFails() {
+ _continuousJfrStarter.onChange(Set.of(), Map.of("pinot.jfr.enabled",
"true"));
+ _continuousJfrStarter.failCommand("jfrStop");
+
+ _continuousJfrStarter.onChange(Set.of("pinot.jfr.enabled"),
Map.of("pinot.jfr.enabled", "false"));
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Starter should keep running state when stop fails")
+ .isTrue();
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=86400000ms",
+ "jfrStop name=pinot-continuous");
+ }
+
+ @Test
+ public void keepsRunningWhenMBeanBecomesUnavailableOnStop() {
+ _continuousJfrStarter.onChange(Set.of(), Map.of("pinot.jfr.enabled",
"true"));
+ _continuousJfrStarter.setMBeanAvailable(false);
+
+ _continuousJfrStarter.onChange(Set.of("pinot.jfr.enabled"),
Map.of("pinot.jfr.enabled", "false"));
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Starter should keep running state when MBean is
unavailable for stop")
+ .isTrue();
+
Assertions.assertThat(_continuousJfrStarter.getExecutedCommands()).containsExactly(
+ "jfrStart name=pinot-continuous settings=default dumponexit=true
disk=true maxsize=209715200 maxage=86400000ms");
}
@Test
- public void cleanUpThreadDeletesFiles()
- throws IOException {
- Path tempDirectory = Files.createTempDirectory("jfr-test-");
- int maxDumps = 3;
- long now = ZonedDateTime.of(2025, 10, 13, 12, 0, 0, 0,
ZoneOffset.UTC).toInstant().toEpochMilli();
+ public void integrationTestWithRealDiagnosticCommandMBean() {
+ ContinuousJfrStarter starter = new ContinuousJfrStarter();
+ if (!starter.isDiagnosticCommandAvailable()) {
+ throw new SkipException("JFR DiagnosticCommand MBean is not available in
this runtime");
+ }
+
+ String recordingName = "pinot-continuous-it-" + System.currentTimeMillis();
+ Map<String, String> enabledConfig = Map.of(
+ "pinot.jfr.enabled", "true",
+ "pinot.jfr.name", recordingName,
+ "pinot.jfr.toDisk", "false");
+ Map<String, String> disabledConfig = Map.of(
+ "pinot.jfr.enabled", "false",
+ "pinot.jfr.name", recordingName,
+ "pinot.jfr.toDisk", "false");
+
try {
- long[] dates = IntStream.range(0, maxDumps * 2)
- .mapToLong(i -> now - i * 3600_000L)
- .sorted()
- .toArray();
- for (long creationDate : dates) {
- Path path = ContinuousJfrStarter.getRecordingPath(tempDirectory,
"test", Instant.ofEpochMilli(creationDate));
- File file = path.toFile();
- Assertions.assertThat(file.createNewFile())
- .describedAs("Should be able to create a file in the temp
directory")
- .isTrue();
- Assertions.assertThat(file.setLastModified(creationDate))
- .describedAs("Should be able to set the last modified time")
- .isTrue();
- }
+ starter.onChange(Set.of(), enabledConfig);
+ Assertions.assertThat(starter.isRunning()).isTrue();
+ Assertions.assertThat(waitForRecordingPresence(recordingName, true, 10,
100))
+ .describedAs("Recording should be visible via JFR.check")
+ .isTrue();
- // Verify that we have 2 * maxDumps files
- try (var files = Files.list(tempDirectory)) {
- Assertions.assertThat(files.count())
- .describedAs("Should have 2 * maxDumps files in the temp
directory")
- .isEqualTo(maxDumps * 2);
+ starter.onChange(Set.of("pinot.jfr.enabled"), disabledConfig);
+ Assertions.assertThat(starter.isRunning()).isFalse();
+ Assertions.assertThat(waitForRecordingPresence(recordingName, false, 10,
100))
+ .describedAs("Recording should be absent via JFR.check after
disable")
+ .isTrue();
+ } finally {
+ if (starter.isRunning()) {
+ starter.onChange(Set.of("pinot.jfr.enabled"), disabledConfig);
}
+ }
+ }
- // Run the cleanup
- ContinuousJfrStarter.cleanUpDumps(tempDirectory, maxDumps, "test");
-
- // Verify that we have maxDumps files and only the newest ones are kept
- try (var files = Files.list(tempDirectory)) {
- var remainingFiles = files.collect(Collectors.toSet());
- Assertions.assertThat(remainingFiles)
- .describedAs("Should have maxDumps files in the temp directory")
- .hasSize(maxDumps);
- for (int i = 0; i < maxDumps; i++) {
- long creationDate = dates[dates.length - 1 - i];
- Instant timestamp = Instant.ofEpochMilli(creationDate);
- Path expectedPath =
ContinuousJfrStarter.getRecordingPath(tempDirectory, "test", timestamp);
- Assertions.assertThat(remainingFiles)
- .describedAs("Should contain the expected file: %s",
expectedPath)
- .contains(expectedPath);
- }
+ private static boolean waitForRecordingPresence(String recordingName,
boolean expectedPresent, int maxAttempts,
+ long delayMs) {
+ for (int i = 0; i < maxAttempts; i++) {
+ boolean present = isRecordingPresent(recordingName);
+ if (present == expectedPresent) {
+ return true;
}
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ return false;
Review Comment:
This integration test uses a custom `Thread.sleep` polling loop with a 1s
total wait (10 * 100ms), which is prone to CI flakiness on slow or loaded
machines. Prefer using the existing `TestUtils.waitForCondition(...)` helper
(used elsewhere in pinot-core tests) with a more generous timeout and failure
message.
##########
pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java:
##########
@@ -155,157 +159,137 @@ public void onChange(Set<String> changedConfigs,
Map<String, String> clusterConf
return;
}
- stopRecording();
- _currentConfig = newSubsetMap;
- startRecording(subset);
+ if (!stopRecording()) {
+ LOGGER.warn("Failed to stop existing continuous JFR recording.
Skipping config update");
+ return;
+ }
+ if (startRecording(subset)) {
+ _currentConfig = newSubsetMap;
+ } else {
+ LOGGER.warn("Failed to apply continuous JFR config update");
+ }
}
}
public boolean isRunning() {
return _running;
Review Comment:
`_running` is annotated `@GuardedBy("this")` but `isRunning()` reads it
without synchronization/volatile, which breaks the stated thread-safety
contract and can return stale values. Consider either synchronizing
`isRunning()`, making `_running` `volatile`, or removing the `@GuardedBy`
annotation if it isn't intended to be guarded.
```suggestion
synchronized (this) {
return _running;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]