This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d4aa66cfc0 Introduce retries while creating stream message decoder for 
more robustness (#13036)
d4aa66cfc0 is described below

commit d4aa66cfc071cb4c237a827d1239f1a6b3de9bd9
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Sat May 18 05:08:59 2024 +0530

    Introduce retries while creating stream message decoder for more robustness 
(#13036)
    
    * Introduce retries while creating stream message decoder to make system 
more robust
    
    * address comments
    
    * fix build
---
 .../realtime/RealtimeSegmentDataManager.java       | 27 ++++++++++++++++++----
 .../manager/realtime/RealtimeTableDataManager.java |  8 +++++--
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index fdd0ef1ae5..a7ba53b1c9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
@@ -102,6 +103,10 @@ import 
org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
 import 
org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -1392,7 +1397,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
       Schema schema, LLCSegmentName llcSegmentName, Semaphore 
partitionGroupConsumerSemaphore,
       ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
-      @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, 
BooleanSupplier isReadyToConsumeData) {
+      @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, 
BooleanSupplier isReadyToConsumeData)
+      throws AttemptsExceededException, RetriableOperationException {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = segmentZKMetadata;
     _tableConfig = tableConfig;
@@ -1524,14 +1530,25 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
     // Create message decoder
     Set<String> fieldsToRead = 
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), 
_schema);
+    RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(5, 
1000L, 1.2f);
+    AtomicReference<StreamDataDecoder> localStreamDataDecoder = new 
AtomicReference<>();
     try {
-      StreamMessageDecoder streamMessageDecoder = 
createMessageDecoder(fieldsToRead);
-      _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+      retryPolicy.attempt(() -> {
+        try {
+          StreamMessageDecoder streamMessageDecoder = 
createMessageDecoder(fieldsToRead);
+          localStreamDataDecoder.set(new 
StreamDataDecoderImpl(streamMessageDecoder));
+          return true;
+        } catch (Exception e) {
+          _segmentLogger.warn("Failed to initialize the StreamMessageDecoder: 
", e);
+          return false;
+        }
+      });
     } catch (Exception e) {
-      _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-          new SegmentErrorInfo(now(), "Failed to initialize the 
StreamMessageDecoder", e));
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(),
+          "Failed to initialize the StreamMessageDecoder", e));
       throw e;
     }
+    _streamDataDecoder = localStreamDataDecoder.get();
 
     try {
       _recordEnricherPipeline = 
RecordEnricherPipeline.fromTableConfig(tableConfig);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 4bd290f528..cbb870037c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -77,6 +77,8 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
 
 
 @ThreadSafe
@@ -406,7 +408,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   @Override
-  public void addConsumingSegment(String segmentName) {
+  public void addConsumingSegment(String segmentName)
+      throws AttemptsExceededException, RetriableOperationException {
     Preconditions.checkState(!_shutDown,
         "Table data manager is already shut down, cannot add CONSUMING 
segment: %s to table: %s", segmentName,
         _tableNameWithType);
@@ -424,7 +427,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
   }
 
-  private void doAddConsumingSegment(String segmentName) {
+  private void doAddConsumingSegment(String segmentName)
+      throws AttemptsExceededException, RetriableOperationException {
     SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
     if (zkMetadata.getStatus() != Status.IN_PROGRESS) {
       // NOTE: We do not throw exception here because the segment might have 
just been committed before the state


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to