This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 265ddd468c Use Kafka Admin client to Fetch offsets instead of Kafka
Consumer (#15641)
265ddd468c is described below
commit 265ddd468c5964e1df3c0894a559ccba2b083211
Author: Kartik Khare <[email protected]>
AuthorDate: Wed Jun 4 13:18:53 2025 +0530
Use Kafka Admin client to Fetch offsets instead of Kafka Consumer (#15641)
* Use Kafka admin client to retrieve offsets instead of Kafka consumer
* Set partition metadata provider as null on close
* Fix tests
* Address review comments
* throw better exception in case offsets are null
* Remove handling of illegal state exception
* Remove setting _partitionMetadataProvider to null
* Create admin client on request
* Create admin client on request
---------
Co-authored-by: KKCorps <[email protected]>
---
.../kafka20/KafkaStreamMetadataProvider.java | 73 ++++++++++++----------
.../kafka30/KafkaStreamMetadataProvider.java | 73 ++++++++++++----------
2 files changed, 78 insertions(+), 68 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index a04cca66d2..a584a0ee59 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -29,12 +29,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
@@ -97,49 +100,51 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
- long offset;
- try {
+ try (AdminClient adminClient = createAdminClient()) {
+ // Build the offset spec request for this partition
+ Map<TopicPartition, OffsetSpec> request = new HashMap<>();
if (offsetCriteria.isLargest()) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.latest());
} else if (offsetCriteria.isSmallest()) {
- offset =
-
_consumer.beginningOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.earliest());
} else if (offsetCriteria.isPeriod()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
- Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
- .get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is period and its value evaluates to null
hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts = Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else if (offsetCriteria.isTimestamp()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is timestamp and its value evaluates to
null hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts =
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else {
- throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ throw new IllegalArgumentException("Unknown offset criteria: " +
offsetCriteria);
}
- return new LongMsgOffset(offset);
- } catch (TimeoutException e) {
+ // Query via AdminClient (thread-safe)
+ ListOffsetsResult result = adminClient.listOffsets(request);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+ result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() ||
offsetCriteria.isPeriod())) {
+ // fetch endOffsets as fallback
+ request.put(_topicPartition, OffsetSpec.latest());
+ result = adminClient.listOffsets(request);
+ offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ LOGGER.warn(
+ "initial offset type is {} and its value evaluates to null hence
proceeding with offset {} " + "for "
+ + "topic {} partition {}", offsetCriteria,
offsets.get(_topicPartition).offset(),
+ _topicPartition.topic(), _topicPartition.partition());
+ }
+ ListOffsetsResult.ListOffsetsResultInfo info =
offsets.get(_topicPartition);
+ if (info == null) {
+ throw new TransientConsumerException(new RuntimeException(
+ String.format("Failed to fetch offset for topic: %s partition:
%d", _topic, _topicPartition.partition())));
+ }
+ return new LongMsgOffset(info.offset());
+ } catch (InterruptedException | ExecutionException |
java.util.concurrent.TimeoutException e) {
throw new TransientConsumerException(e);
}
}
+ private boolean isValidOffsetInfo(Map<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo> offsets) {
+ return offsets != null && offsets.containsKey(_topicPartition) &&
offsets.get(_topicPartition).offset() >= 0;
+ }
+
@Override
public Map<String, PartitionLagState> getCurrentPartitionLagState(
Map<String, ConsumerPartitionState> currentPartitionStateMap) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 96775641ca..65c803804b 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -29,12 +29,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
@@ -97,49 +100,51 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
- long offset;
- try {
+ try (AdminClient adminClient = createAdminClient()) {
+ // Build the offset spec request for this partition
+ Map<TopicPartition, OffsetSpec> request = new HashMap<>();
if (offsetCriteria.isLargest()) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.latest());
} else if (offsetCriteria.isSmallest()) {
- offset =
-
_consumer.beginningOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.earliest());
} else if (offsetCriteria.isPeriod()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
- Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
- .get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is period and its value evaluates to null
hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts = Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else if (offsetCriteria.isTimestamp()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is timestamp and its value evaluates to
null hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts =
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else {
- throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ throw new IllegalArgumentException("Unknown offset criteria: " +
offsetCriteria);
}
- return new LongMsgOffset(offset);
- } catch (TimeoutException e) {
+ // Query via AdminClient (thread-safe)
+ ListOffsetsResult result = adminClient.listOffsets(request);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+ result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() ||
offsetCriteria.isPeriod())) {
+ // fetch endOffsets as fallback
+ request.put(_topicPartition, OffsetSpec.latest());
+ result = adminClient.listOffsets(request);
+ offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ LOGGER.warn(
+ "initial offset type is {} and its value evaluates to null hence
proceeding with offset {} " + "for "
+ + "topic {} partition {}", offsetCriteria,
offsets.get(_topicPartition).offset(),
+ _topicPartition.topic(), _topicPartition.partition());
+ }
+ ListOffsetsResult.ListOffsetsResultInfo info =
offsets.get(_topicPartition);
+ if (info == null) {
+ throw new TransientConsumerException(new RuntimeException(
+ String.format("Failed to fetch offset for topic: %s partition:
%d", _topic, _topicPartition.partition())));
+ }
+ return new LongMsgOffset(info.offset());
+ } catch (InterruptedException | ExecutionException |
java.util.concurrent.TimeoutException e) {
throw new TransientConsumerException(e);
}
}
+ private boolean isValidOffsetInfo(Map<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo> offsets) {
+ return offsets != null && offsets.containsKey(_topicPartition) &&
offsets.get(_topicPartition).offset() >= 0;
+ }
+
@Override
public Map<String, PartitionLagState> getCurrentPartitionLagState(
Map<String, ConsumerPartitionState> currentPartitionStateMap) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]