This is an automated email from the ASF dual-hosted git repository.
vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 28aec2e01c Add some additional metrics for Minion tasks (#12710)
28aec2e01c is described below
commit 28aec2e01cedfcfaf47e83f41f58f810ac153b31
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Sun Mar 31 18:54:27 2024 -0700
Add some additional metrics for Minion tasks (#12710)
* Add some additional metrics for Minion tasks
* Address review comments
---
.../apache/pinot/common/metrics/MinionMeter.java | 8 +++++-
.../apache/pinot/core/minion/SegmentPurger.java | 8 ++++--
.../BaseMultipleSegmentsConversionExecutor.java | 13 ++++++++-
.../tasks/BaseSingleSegmentConversionExecutor.java | 22 +++++++++++++--
.../plugin/minion/tasks/BaseTaskExecutor.java | 33 ++++++++++++++++++++++
5 files changed, 76 insertions(+), 8 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
index 376f86e55e..c85aad39ed 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
@@ -31,7 +31,13 @@ public enum MinionMeter implements AbstractMetrics.Meter {
NUMBER_TASKS_FAILED("tasks", false),
NUMBER_TASKS_FATAL_FAILED("tasks", false),
SEGMENT_UPLOAD_FAIL_COUNT("segments", false),
- SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false);
+ SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false),
+ SEGMENT_DOWNLOAD_COUNT("segments", false),
+ SEGMENT_UPLOAD_COUNT("segments", false),
+ SEGMENT_BYTES_DOWNLOADED("bytes", false),
+ SEGMENT_BYTES_UPLOADED("bytes", false),
+ RECORDS_PROCESSED_COUNT("rows", false),
+ RECORDS_PURGED_COUNT("rows", false);
private final String _meterName;
private final String _unit;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 4faf695522..2ab65bbe9c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -70,7 +70,8 @@ public class SegmentPurger {
throws Exception {
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
String segmentName = segmentMetadata.getName();
- LOGGER.info("Start purging table: {}, segment: {}",
_tableConfig.getTableName(), segmentName);
+ String tableNameWithType = _tableConfig.getTableName();
+ LOGGER.info("Start purging table: {}, segment: {}", tableNameWithType,
segmentName);
try (PurgeRecordReader purgeRecordReader = new PurgeRecordReader()) {
// Make a first pass through the data to see if records need to be
purged or modified
@@ -107,8 +108,9 @@ public class SegmentPurger {
driver.build();
}
- LOGGER.info("Finish purging table: {}, segment: {}, purged {} records,
modified {} records",
- _tableConfig.getTableName(), segmentName, _numRecordsPurged,
_numRecordsModified);
+ LOGGER.info("Finish purging table: {}, segment: {}, purged {} records,
modified {} records", tableNameWithType,
+ segmentName, _numRecordsPurged, _numRecordsModified);
+
return new File(_workingDir, segmentName);
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 6b439add13..e7ef8a4eea 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -51,6 +51,7 @@ import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.filesystem.PinotFS;
@@ -192,6 +193,8 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String crypterName =
getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();
try {
List<File> inputSegmentDirs = new ArrayList<>();
+ int numRecords = 0;
+
for (int i = 0; i < downloadURLs.length; i++) {
// Download the segment file
_eventObserver.notifyProgress(_pinotTaskConfig, String
@@ -209,6 +212,10 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred input segment: {}",
tarredSegmentFile.getAbsolutePath());
}
+
+ reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
+ SegmentMetadataImpl segmentMetadata = new
SegmentMetadataImpl(indexDir);
+ numRecords += segmentMetadata.getTotalDocs();
}
// Convert the segments
@@ -216,6 +223,8 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
Preconditions.checkState(workingDir.mkdir());
List<SegmentConversionResult> segmentConversionResults =
convert(pinotTaskConfig, inputSegmentDirs, workingDir);
+ reportTaskProcessingMetrics(tableNameWithType, taskType, numRecords);
+
// Create a directory for converted tarred segment files
File convertedTarredSegmentDir = new File(tempDataDir,
"convertedTarredSegmentDir");
Preconditions.checkState(convertedTarredSegmentDir.mkdir());
@@ -224,11 +233,13 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
List<File> tarredSegmentFiles = new ArrayList<>(numOutputSegments);
int count = 1;
for (SegmentConversionResult segmentConversionResult :
segmentConversionResults) {
+ File convertedSegmentDir = segmentConversionResult.getFile();
+ reportSegmentUploadMetrics(convertedSegmentDir, tableNameWithType,
taskType);
+
// Tar the converted segment
_eventObserver.notifyProgress(_pinotTaskConfig, String
.format("Compressing segment: %s (%d out of %d)",
segmentConversionResult.getSegmentName(), count++,
numOutputSegments));
- File convertedSegmentDir = segmentConversionResult.getFile();
File convertedSegmentTarFile = new File(convertedTarredSegmentDir,
segmentConversionResult.getSegmentName() +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
TarGzCompressionUtils.createTarGzFile(convertedSegmentDir,
convertedSegmentTarFile);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index 22337ada6b..a920817ae9 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -36,7 +36,6 @@ import org.apache.pinot.common.Utils;
import org.apache.pinot.common.auth.AuthProviderUtils;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.MinionMeter;
-import org.apache.pinot.common.metrics.MinionMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
@@ -45,6 +44,8 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.exception.TaskCancelledException;
+import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskExecutor;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -60,8 +61,6 @@ import org.slf4j.LoggerFactory;
public abstract class BaseSingleSegmentConversionExecutor extends
BaseTaskExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseSingleSegmentConversionExecutor.class);
- protected final MinionMetrics _minionMetrics = MinionMetrics.get();
-
// Tracking finer grained progress status.
protected PinotTaskConfig _pinotTaskConfig;
protected MinionEventObserver _eventObserver;
@@ -123,6 +122,9 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
LOGGER.warn("Failed to delete tarred input segment: {}",
tarredSegmentFile.getAbsolutePath());
}
+ // Publish metrics related to segment download
+ reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
+
// Convert the segment
File workingDir = new File(tempDataDir, "workingDir");
Preconditions.checkState(workingDir.mkdir());
@@ -135,6 +137,20 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
if (convertedSegmentDir == null) {
return segmentConversionResult;
}
+
+ // Publish metrics related to segment upload
+ reportSegmentUploadMetrics(workingDir, tableNameWithType, taskType);
+
+ // Collect the task processing metrics from various single segment
executors and publish them here.
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ Object numRecordsPurged =
segmentConversionResult.getCustomProperty(PurgeTaskExecutor.NUM_RECORDS_PURGED_KEY);
+ if (numRecordsPurged != null) {
+ reportTaskProcessingMetrics(tableNameWithType, taskType,
segmentMetadata.getTotalDocs(),
+ (int) numRecordsPurged);
+ } else {
+ reportTaskProcessingMetrics(tableNameWithType, taskType,
segmentMetadata.getTotalDocs());
+ }
+
// Tar the converted segment
_eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: "
+ segmentName);
File convertedTarredSegmentFile =
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
index d85bf44737..2b57bbb8b4 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
@@ -19,9 +19,13 @@
package org.apache.pinot.plugin.minion.tasks;
import com.google.common.base.Preconditions;
+import java.io.File;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.metrics.MinionMeter;
+import org.apache.pinot.common.metrics.MinionMetrics;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
@@ -33,6 +37,7 @@ public abstract class BaseTaskExecutor implements
PinotTaskExecutor {
protected static final MinionContext MINION_CONTEXT =
MinionContext.getInstance();
protected boolean _cancelled = false;
+ protected final MinionMetrics _minionMetrics = MinionMetrics.get();
@Override
public void cancel() {
@@ -68,4 +73,32 @@ public abstract class BaseTaskExecutor implements
PinotTaskExecutor {
*/
return segmentZKMetadata == null ? -1 : segmentZKMetadata.getCrc();
}
+
+ protected void reportSegmentDownloadMetrics(File indexDir, String
tableNameWithType, String taskType) {
+ long downloadSegmentSize = FileUtils.sizeOfDirectory(indexDir);
+ addTaskMeterMetrics(MinionMeter.SEGMENT_BYTES_DOWNLOADED,
downloadSegmentSize, tableNameWithType, taskType);
+ addTaskMeterMetrics(MinionMeter.SEGMENT_DOWNLOAD_COUNT, 1L,
tableNameWithType, taskType);
+ }
+
+ protected void reportSegmentUploadMetrics(File indexDir, String
tableNameWithType, String taskType) {
+ long uploadSegmentSize = FileUtils.sizeOfDirectory(indexDir);
+ addTaskMeterMetrics(MinionMeter.SEGMENT_BYTES_UPLOADED, uploadSegmentSize,
tableNameWithType, taskType);
+ addTaskMeterMetrics(MinionMeter.SEGMENT_UPLOAD_COUNT, 1L,
tableNameWithType, taskType);
+ }
+
+ protected void reportTaskProcessingMetrics(String tableNameWithType, String
taskType, int numRecordsProcessed,
+ int numRecordsPurged) {
+ reportTaskProcessingMetrics(tableNameWithType, taskType,
numRecordsProcessed);
+ addTaskMeterMetrics(MinionMeter.RECORDS_PURGED_COUNT, numRecordsPurged,
tableNameWithType, taskType);
+ }
+
+ protected void reportTaskProcessingMetrics(String tableNameWithType, String
taskType, int numRecordsProcessed) {
+ addTaskMeterMetrics(MinionMeter.RECORDS_PROCESSED_COUNT,
numRecordsProcessed, tableNameWithType, taskType);
+ }
+
+ private void addTaskMeterMetrics(MinionMeter meter, long unitCount, String
tableName, String taskType) {
+ _minionMetrics.addMeteredGlobalValue(meter, unitCount);
+ _minionMetrics.addMeteredTableValue(tableName, meter, unitCount);
+ _minionMetrics.addMeteredTableValue(tableName, taskType, meter, unitCount);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]