This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 55806ba484 Do not log exceptions when fetching offsets for lag metric
(#13528)
55806ba484 is described below
commit 55806ba484f75d3629b11c043eebe25aed222630
Author: Kartik Khare <[email protected]>
AuthorDate: Wed Jul 3 12:42:28 2024 +0530
Do not log exceptions when fetching offsets for lag metric (#13528)
Co-authored-by: Kartik Khare
<[email protected]>
---
.../realtime/RealtimeSegmentDataManager.java | 29 ++++++++++++++++------
1 file changed, 21 insertions(+), 8 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 862ec52615..c26b2c14f3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1682,25 +1682,38 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return _idleTimer.getTimeSinceEventLastConsumedMs();
}
+ public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs,
boolean useDebugLog) {
+ return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
maxWaitTimeMs, useDebugLog);
+ }
+
public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
- return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
maxWaitTimeMs);
+ return fetchLatestStreamOffset(maxWaitTimeMs, false);
+ }
+
+ public StreamPartitionMsgOffset fetchEarliestStreamOffset(long
maxWaitTimeMs, boolean useDebugLog) {
+ return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
maxWaitTimeMs, useDebugLog);
}
public StreamPartitionMsgOffset fetchEarliestStreamOffset(long
maxWaitTimeMs) {
- return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
maxWaitTimeMs);
+ return fetchEarliestStreamOffset(maxWaitTimeMs, false);
}
- private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria
offsetCriteria, long maxWaitTimeMs) {
+ private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria
offsetCriteria, long maxWaitTimeMs,
+ boolean useDebugLog) {
if (_partitionMetadataProvider == null) {
createPartitionMetadataProvider("Fetch latest stream offset");
}
try {
return
_partitionMetadataProvider.fetchStreamPartitionOffset(offsetCriteria,
maxWaitTimeMs);
} catch (Exception e) {
- _segmentLogger.warn(
- String.format(
- "Cannot fetch stream offset with criteria %s for clientId %s and
partitionGroupId %d with maxWaitTime %d",
- offsetCriteria, _clientId, _partitionGroupId, maxWaitTimeMs), e);
+ String logMessage = String.format(
+ "Cannot fetch stream offset with criteria %s for clientId %s and
partitionGroupId %d with maxWaitTime %d",
+ offsetCriteria, _clientId, _partitionGroupId, maxWaitTimeMs);
+ if (!useDebugLog) {
+ _segmentLogger.warn(logMessage, e);
+ } else {
+ _segmentLogger.debug(logMessage, e);
+ }
}
return null;
}
@@ -1810,7 +1823,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private void updateIngestionMetrics(RowMetadata metadata) {
if (metadata != null) {
try {
- StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000);
+ StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000,
true);
_realtimeTableDataManager.updateIngestionMetrics(metadata.getRecordIngestionTimeMs(),
metadata.getFirstStreamRecordIngestionTimeMs(),
metadata.getOffset(), latestOffset, _partitionGroupId);
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]