This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d4aa66cfc0 Introduce retries while creating stream message decoder for
more robustness (#13036)
d4aa66cfc0 is described below
commit d4aa66cfc071cb4c237a827d1239f1a6b3de9bd9
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Sat May 18 05:08:59 2024 +0530
Introduce retries while creating stream message decoder for more robustness
(#13036)
* Introduce retries while creating stream message decoder to make system
more robust
* address comments
* fix build
---
.../realtime/RealtimeSegmentDataManager.java | 27 ++++++++++++++++++----
.../manager/realtime/RealtimeTableDataManager.java | 8 +++++--
2 files changed, 28 insertions(+), 7 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index fdd0ef1ae5..a7ba53b1c9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
@@ -102,6 +103,10 @@ import
org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
import
org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -1392,7 +1397,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
RealtimeTableDataManager realtimeTableDataManager, String
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
Schema schema, LLCSegmentName llcSegmentName, Semaphore
partitionGroupConsumerSemaphore,
ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
- @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager,
BooleanSupplier isReadyToConsumeData) {
+ @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager,
BooleanSupplier isReadyToConsumeData)
+ throws AttemptsExceededException, RetriableOperationException {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = segmentZKMetadata;
_tableConfig = tableConfig;
@@ -1524,14 +1530,25 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// Create message decoder
Set<String> fieldsToRead =
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(),
_schema);
+ RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(5,
1000L, 1.2f);
+ AtomicReference<StreamDataDecoder> localStreamDataDecoder = new
AtomicReference<>();
try {
- StreamMessageDecoder streamMessageDecoder =
createMessageDecoder(fieldsToRead);
- _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+ retryPolicy.attempt(() -> {
+ try {
+ StreamMessageDecoder streamMessageDecoder =
createMessageDecoder(fieldsToRead);
+ localStreamDataDecoder.set(new
StreamDataDecoderImpl(streamMessageDecoder));
+ return true;
+ } catch (Exception e) {
+ _segmentLogger.warn("Failed to initialize the StreamMessageDecoder:
", e);
+ return false;
+ }
+ });
} catch (Exception e) {
- _realtimeTableDataManager.addSegmentError(_segmentNameStr,
- new SegmentErrorInfo(now(), "Failed to initialize the
StreamMessageDecoder", e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(),
+ "Failed to initialize the StreamMessageDecoder", e));
throw e;
}
+ _streamDataDecoder = localStreamDataDecoder.get();
try {
_recordEnricherPipeline =
RecordEnricherPipeline.fromTableConfig(tableConfig);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 4bd290f528..cbb870037c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -77,6 +77,8 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
@ThreadSafe
@@ -406,7 +408,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
@Override
- public void addConsumingSegment(String segmentName) {
+ public void addConsumingSegment(String segmentName)
+ throws AttemptsExceededException, RetriableOperationException {
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot add CONSUMING
segment: %s to table: %s", segmentName,
_tableNameWithType);
@@ -424,7 +427,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
}
- private void doAddConsumingSegment(String segmentName) {
+ private void doAddConsumingSegment(String segmentName)
+ throws AttemptsExceededException, RetriableOperationException {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
if (zkMetadata.getStatus() != Status.IN_PROGRESS) {
// NOTE: We do not throw exception here because the segment might have
just been committed before the state
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]