This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 29c560f523 Move offset validation logic to consumer classes (#13015)
29c560f523 is described below
commit 29c560f523bab4529e6685c7df061105d8dc3df1
Author: Kartik Khare <[email protected]>
AuthorDate: Thu May 23 19:02:09 2024 +0530
Move offset validation logic to consumer classes (#13015)
* Enhance Kinesis consumer
* Simplify the handling
* Address comments
* Move offset validation logic to consumer classes
* Add missing message interface to message batch
* fix linting
* remove unused interface
* Cleanup and refactoring
* lint fixes
---------
Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
Co-authored-by: Kartik Khare
<[email protected]>
Co-authored-by: Kartik Khare <[email protected]>
---
.../realtime/RealtimeSegmentDataManager.java | 21 ++++++++-------------
.../plugin/stream/kafka20/KafkaMessageBatch.java | 9 ++++++++-
.../stream/kafka20/KafkaPartitionLevelConsumer.java | 5 ++++-
.../plugin/stream/kinesis/KinesisConsumer.java | 1 -
.../org/apache/pinot/spi/stream/MessageBatch.java | 8 ++++++++
5 files changed, 28 insertions(+), 16 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 01fffced36..b441f086de 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -468,10 +468,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
throw t;
}
- StreamPartitionMsgOffset batchFirstOffset =
messageBatch.getFirstMessageOffset();
- if (batchFirstOffset != null) {
- validateStartOffset(_currentOffset, batchFirstOffset);
- }
+ reportDataLoss(messageBatch);
boolean endCriteriaReached = processStreamEvents(messageBatch,
idlePipeSleepTimeMillis);
@@ -922,18 +919,16 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
/**
- * Checks if the begin offset of the stream partition has been
fast-forwarded.
- * batchFirstOffset should be less than or equal to startOffset.
- * If batchFirstOffset is greater, then some messages were not received.
+ * Checks and reports if the consumer is going through data loss.
*
- * @param startOffset The offset of the first message desired, inclusive.
- * @param batchFirstOffset The offset of the first message in the batch.
+ * @param messageBatch Message batch to validate
*/
- private void validateStartOffset(StreamPartitionMsgOffset startOffset,
StreamPartitionMsgOffset batchFirstOffset) {
- if (batchFirstOffset.compareTo(startOffset) > 0) {
+ private void reportDataLoss(MessageBatch messageBatch) {
+ if (messageBatch.hasDataLoss()) {
_serverMetrics.addMeteredTableValue(_tableStreamName,
ServerMeter.STREAM_DATA_LOSS, 1L);
- String message =
- "startOffset(" + startOffset + ") is older than topic's beginning
offset(" + batchFirstOffset + ")";
+ String message = String.format("Message loss detected in stream
partition: %s for table: %s startOffset: %s "
+ + "batchFirstOffset: %s", _partitionGroupId, _tableNameWithType,
_startOffset,
+ messageBatch.getFirstMessageOffset());
_segmentLogger.error(message);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), message, null));
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index 3f137b54af..1e3361ba00 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -33,6 +33,7 @@ public class KafkaMessageBatch implements
MessageBatch<byte[]> {
private final long _offsetOfNextBatch;
private final long _firstOffset;
private final StreamMessageMetadata _lastMessageMetadata;
+ private final boolean _hasDataLoss;
/**
* @param messages the messages, which may be smaller than {@see
unfilteredMessageCount}
@@ -43,12 +44,13 @@ public class KafkaMessageBatch implements
MessageBatch<byte[]> {
* delay when a batch has all messages filtered.
*/
public KafkaMessageBatch(List<BytesStreamMessage> messages, int
unfilteredMessageCount, long offsetOfNextBatch,
- long firstOffset, @Nullable StreamMessageMetadata lastMessageMetadata) {
+ long firstOffset, @Nullable StreamMessageMetadata lastMessageMetadata,
boolean hasDataLoss) {
_messages = messages;
_unfilteredMessageCount = unfilteredMessageCount;
_offsetOfNextBatch = offsetOfNextBatch;
_firstOffset = firstOffset;
_lastMessageMetadata = lastMessageMetadata;
+ _hasDataLoss = hasDataLoss;
}
@Override
@@ -82,4 +84,9 @@ public class KafkaMessageBatch implements
MessageBatch<byte[]> {
public StreamMessageMetadata getLastMessageMetadata() {
return _lastMessageMetadata;
}
+
+ @Override
+ public boolean hasDataLoss() {
+ return _hasDataLoss;
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 36a74c1e65..03731df079 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -61,6 +61,7 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
}
_consumer.seek(_topicPartition, startOffset);
}
+
ConsumerRecords<String, Bytes> consumerRecords =
_consumer.poll(Duration.ofMillis(timeoutMs));
List<ConsumerRecord<String, Bytes>> records =
consumerRecords.records(_topicPartition);
List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
@@ -84,7 +85,9 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
lastMessageMetadata = messageMetadata;
}
}
- return new KafkaMessageBatch(filteredRecords, records.size(),
offsetOfNextBatch, firstOffset, lastMessageMetadata);
+
+ return new KafkaMessageBatch(filteredRecords, records.size(),
offsetOfNextBatch, firstOffset, lastMessageMetadata,
+ firstOffset > startOffset);
}
private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String,
Bytes> record) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index a8e40c87a8..e7bb76797a 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -66,7 +66,6 @@ public class KinesisConsumer extends KinesisConnectionHandler
implements Partiti
KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset)
startMsgOffset;
String shardId = startOffset.getShardId();
String startSequenceNumber = startOffset.getSequenceNumber();
-
// Get the shard iterator
String shardIterator;
if (startSequenceNumber.equals(_nextStartSequenceNumber)) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 2f00c82657..9a2eae1c30 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -96,6 +96,14 @@ public interface MessageBatch<T> {
return false;
}
+ /**
+ * Returns {code true} if the current batch has data loss.
+ * This is useful to determine if there were gaps in the stream.
+ */
+ default boolean hasDataLoss() {
+ return false;
+ }
+
@Deprecated
default T getMessageAtIndex(int index) {
throw new UnsupportedOperationException();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]