This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f54559987e Handle unsupported exception gracefully (#13524)
f54559987e is described below
commit f54559987e9b66eaed4cc68852653e14a133b9e0
Author: Kartik Khare <[email protected]>
AuthorDate: Tue Jul 2 18:34:36 2024 +0530
Handle unsupported exception gracefully (#13524)
Co-authored-by: Kartik Khare
<[email protected]>
---
.../pinot/core/data/manager/realtime/IngestionDelayTracker.java | 4 ++++
.../pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java | 3 +--
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index eed1302708..6953ddaf33 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -195,6 +195,10 @@ public class IngestionDelayTracker {
StreamPartitionMsgOffset currentOffset = offset._offset;
StreamPartitionMsgOffset latestOffset = offset._latestOffset;
+ if (currentOffset == null || latestOffset == null) {
+ return 0;
+ }
+
// Compute aged delay for current partition
// TODO: Support other types of offsets
if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof
LongMsgOffset)) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index dbfe885cc0..862ec52615 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1810,8 +1810,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private void updateIngestionMetrics(RowMetadata metadata) {
if (metadata != null) {
try {
- StreamPartitionMsgOffset latestOffset =
-
_partitionMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
5000);
+ StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000);
_realtimeTableDataManager.updateIngestionMetrics(metadata.getRecordIngestionTimeMs(),
metadata.getFirstStreamRecordIngestionTimeMs(),
metadata.getOffset(), latestOffset, _partitionGroupId);
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]