This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ab0c27e947 Add decoder initialization error to the server's error
cache (#10773)
ab0c27e947 is described below
commit ab0c27e947cbc280d3c55217a7a136259044de01
Author: Navina Ramesh <[email protected]>
AuthorDate: Wed May 17 15:26:51 2023 -0700
Add decoder initialization error to the server's error cache (#10773)
* Add decoder initialization error to the server's error cache.
* Adding constructor failure to error cache
---
.../data/manager/realtime/LLRealtimeSegmentDataManager.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4f3428a5fe..7f96359ce0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1419,8 +1419,15 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Create message decoder
Set<String> fieldsToRead =
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(),
_schema);
- StreamMessageDecoder streamMessageDecoder =
StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
- _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+ try {
+ StreamMessageDecoder streamMessageDecoder =
+ StreamDecoderProvider.create(_partitionLevelStreamConfig,
fieldsToRead);
+ _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+ } catch (Exception e) {
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(now(), "Failed to initialize the
StreamMessageDecoder", e));
+ throw e;
+ }
_transformPipeline = new TransformPipeline(tableConfig, schema);
// Acquire semaphore to create stream consumers
try {
@@ -1458,6 +1465,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the
semaphore is acquired, but not released.
// Hence releasing the semaphore here to unblock reset operation via
Helix Admin.
_partitionGroupConsumerSemaphore.release();
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(),
+ "Failed to initialize segment data manager", e));
throw e;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]