This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f25f3ed1f2 Cleanup HLC code (#11326)
f25f3ed1f2 is described below
commit f25f3ed1f242513b90207d082fb80dbfdc9b06f6
Author: Navina Ramesh <[email protected]>
AuthorDate: Sun Aug 13 18:34:34 2023 -0700
Cleanup HLC code (#11326)
---
.../broker/broker/FakeStreamConsumerFactory.java | 9 -
.../common/utils/config/TableConfigUtilsTest.java | 7 -
.../realtime/HLRealtimeSegmentDataManager.java | 489 ---------------------
.../manager/realtime/RealtimeTableDataManager.java | 61 +--
.../impl/fakestream/FakeStreamConsumerFactory.java | 8 -
.../stream/kafka20/KafkaConsumerFactory.java | 8 -
.../stream/kafka20/KafkaStreamLevelConsumer.java | 158 -------
.../kafka20/KafkaStreamLevelConsumerManager.java | 199 ---------
.../kafka20/KafkaStreamLevelStreamConfig.java | 133 ------
.../stream/kinesis/KinesisConsumerFactory.java | 9 -
.../stream/pulsar/PulsarConsumerFactory.java | 8 -
.../stream/pulsar/PulsarStreamLevelConsumer.java | 114 -----
.../pulsar/PulsarStreamLevelConsumerManager.java | 193 --------
.../pinot/spi/stream/StreamConsumerFactory.java | 8 +-
14 files changed, 29 insertions(+), 1375 deletions(-)
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
index be2f6d8a6a..3ada727a13 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.broker;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
@@ -31,7 +30,6 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -41,13 +39,6 @@ public class FakeStreamConsumerFactory extends
StreamConsumerFactory {
public PartitionLevelConsumer createPartitionLevelConsumer(String clientId,
int partition) {
return new FakePartitionLevelConsumer();
}
-
- @Override
- public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Set<String> fieldsToRead,
- String groupId) {
- return null;
- }
-
@Override
public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
return new FakesStreamMetadataProvider();
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
index 9d1f86dc23..edb3b58f98 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
@@ -34,7 +34,6 @@ import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -218,12 +217,6 @@ public class TableConfigUtilsTest {
return null;
}
- @Override
- public StreamLevelConsumer createStreamLevelConsumer(String clientId,
String tableName, Set<String> fieldsToRead,
- String groupId) {
- return null;
- }
-
@Override
public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
return null;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
deleted file mode 100644
index 246cf83e98..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerGauge;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
-import
org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
-import
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
-import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
-import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.utils.IngestionUtils;
-import org.apache.pinot.segment.spi.MutableSegment;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.metrics.PinotMeter;
-import org.apache.pinot.spi.stream.ConsumerPartitionState;
-import org.apache.pinot.spi.stream.PartitionLagState;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
-import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(HLRealtimeSegmentDataManager.class);
- private final static long ONE_MINUTE_IN_MILLSEC = 1000 * 60;
-
- private final String _tableNameWithType;
- private final String _segmentName;
- private final String _timeColumnName;
- private final TimeUnit _timeType;
- private final RecordTransformer _recordTransformer;
-
- private final StreamLevelConsumer _streamLevelConsumer;
- private final File _resourceTmpDir;
- private final MutableSegmentImpl _realtimeSegment;
- private final String _tableStreamName;
- private final StreamConfig _streamConfig;
-
- private final long _start = System.currentTimeMillis();
- private long _segmentEndTimeThreshold;
- private AtomicLong _lastUpdatedRawDocuments = new AtomicLong(0);
-
- private volatile boolean _keepIndexing = true;
- private volatile boolean _isShuttingDown = false;
-
- private final TimerTask _segmentStatusTask;
- private final ServerMetrics _serverMetrics;
- private final RealtimeTableDataManager _notifier;
- private Thread _indexingThread;
-
- private final String _sortedColumn;
- private final List<String> _invertedIndexColumns;
- private final Logger _segmentLogger;
- private final SegmentVersion _segmentVersion;
-
- private PinotMeter _tableAndStreamRowsConsumed = null;
- private PinotMeter _tableRowsConsumed = null;
-
- // An instance of this class exists only for the duration of the realtime
segment that is currently being consumed.
- // Once the segment is committed, the segment is handled by
OfflineSegmentDataManager
- public HLRealtimeSegmentDataManager(final SegmentZKMetadata
segmentZKMetadata, final TableConfig tableConfig,
- InstanceZKMetadata instanceMetadata, final RealtimeTableDataManager
realtimeTableDataManager,
- final String resourceDataDir, final IndexLoadingConfig
indexLoadingConfig, final Schema schema,
- final ServerMetrics serverMetrics)
- throws Exception {
- super();
- _segmentVersion = indexLoadingConfig.getSegmentVersion();
- _recordTransformer =
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
- _serverMetrics = serverMetrics;
- _segmentName = segmentZKMetadata.getSegmentName();
- _tableNameWithType = tableConfig.getTableName();
- _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
- Preconditions
- .checkNotNull(_timeColumnName, "Must provide valid timeColumnName in
tableConfig for realtime table {}",
- _tableNameWithType);
- DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(_timeColumnName);
- Preconditions.checkNotNull(dateTimeFieldSpec, "Must provide field spec for
time column {}", _timeColumnName);
- _timeType = dateTimeFieldSpec.getFormatSpec().getColumnUnit();
-
- List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
- if (sortedColumns.isEmpty()) {
- LOGGER.info("RealtimeDataResourceZKMetadata contains no information
about sorted column for segment {}",
- _segmentName);
- _sortedColumn = null;
- } else {
- String firstSortedColumn = sortedColumns.get(0);
- if (schema.hasColumn(firstSortedColumn)) {
- LOGGER.info("Setting sorted column name: {} from
RealtimeDataResourceZKMetadata for segment {}",
- firstSortedColumn, _segmentName);
- _sortedColumn = firstSortedColumn;
- } else {
- LOGGER
- .warn("Sorted column name: {} from RealtimeDataResourceZKMetadata
is not existed in schema for segment {}.",
- firstSortedColumn, _segmentName);
- _sortedColumn = null;
- }
- }
-
- // Inverted index columns
- // We need to add sorted column into inverted index columns because when
we convert realtime in memory segment into
- // offline segment, we use sorted column's inverted index to maintain the
order of the records so that the records
- // are sorted on the sorted column.
- if (_sortedColumn != null) {
- indexLoadingConfig.addInvertedIndexColumns(_sortedColumn);
- }
- Set<String> invertedIndexColumns =
indexLoadingConfig.getInvertedIndexColumns();
- _invertedIndexColumns = new ArrayList<>(invertedIndexColumns);
- _streamConfig = new StreamConfig(_tableNameWithType,
IngestionConfigUtils.getStreamConfigMap(tableConfig));
-
- _segmentLogger = LoggerFactory.getLogger(
- HLRealtimeSegmentDataManager.class.getName() + "_" + _segmentName +
"_" + _streamConfig.getTopicName());
- _segmentLogger.info("Created segment data manager with Sorted column:{},
invertedIndexColumns:{}", _sortedColumn,
- invertedIndexColumns);
-
- _segmentEndTimeThreshold = _start +
_streamConfig.getFlushThresholdTimeMillis();
- _resourceTmpDir = new File(resourceDataDir, RESOURCE_TEMP_DIR_NAME);
- if (!_resourceTmpDir.exists()) {
- _resourceTmpDir.mkdirs();
- }
- // create and init stream level consumer
- StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(_streamConfig);
- String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-"
+ _streamConfig.getTopicName();
- Set<String> fieldsToRead =
IngestionUtils.getFieldsForRecordExtractor(tableConfig.getIngestionConfig(),
schema);
- _streamLevelConsumer =
streamConsumerFactory.createStreamLevelConsumer(clientId, _tableNameWithType,
fieldsToRead,
- instanceMetadata.getGroupId(_tableNameWithType));
- _streamLevelConsumer.start();
- _tableStreamName = _tableNameWithType + "_" + _streamConfig.getTopicName();
-
- IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
- if (indexingConfig != null && indexingConfig.isAggregateMetrics()) {
- LOGGER.warn("Updating of metrics only supported for LLC consumer,
ignoring.");
- }
-
- // lets create a new realtime segment
- _segmentLogger.info("Started {} stream provider", _streamConfig.getType());
- final int capacity = _streamConfig.getFlushThresholdRows();
- boolean nullHandlingEnabled = indexingConfig != null &&
indexingConfig.isNullHandlingEnabled();
- RealtimeSegmentConfig realtimeSegmentConfig =
- new
RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(_tableNameWithType)
- .setSegmentName(_segmentName)
-
.setStreamName(_streamConfig.getTopicName()).setSchema(schema).setTimeColumnName(_timeColumnName)
-
.setCapacity(capacity).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
- .setSegmentZKMetadata(segmentZKMetadata)
-
.setOffHeap(indexLoadingConfig.isRealtimeOffHeapAllocation()).setMemoryManager(
- getMemoryManager(realtimeTableDataManager.getConsumerDir(),
_segmentName,
- indexLoadingConfig.isRealtimeOffHeapAllocation(),
- indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
serverMetrics))
- .setStatsHistory(realtimeTableDataManager.getStatsHistory())
- .setNullHandlingEnabled(nullHandlingEnabled).build();
- _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig,
serverMetrics);
-
- _notifier = realtimeTableDataManager;
-
- LOGGER.info("Starting consumption on realtime consuming segment {}
maxRowCount {} maxEndTime {}", _segmentName,
- capacity, new DateTime(_segmentEndTimeThreshold,
DateTimeZone.UTC).toString());
- _segmentStatusTask = new TimerTask() {
- @Override
- public void run() {
- computeKeepIndexing();
- }
- };
-
- // start the indexing thread
- _indexingThread = new Thread(new Runnable() {
- @Override
- public void run() {
- // continue indexing until criteria is met
- boolean notFull = true;
- long exceptionSleepMillis = 50L;
- _segmentLogger.info("Starting to collect rows");
-
- int numRowsErrored = 0;
- GenericRow reuse = new GenericRow();
- do {
- reuse.clear();
- try {
- GenericRow consumedRow;
- try {
- consumedRow = _streamLevelConsumer.next(reuse);
- _tableAndStreamRowsConsumed = serverMetrics
- .addMeteredTableValue(_tableStreamName,
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
- _tableAndStreamRowsConsumed);
- _tableRowsConsumed =
-
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
_tableRowsConsumed);
- } catch (Exception e) {
- _segmentLogger.warn("Caught exception while consuming row,
sleeping for {} ms", exceptionSleepMillis, e);
- numRowsErrored++;
- serverMetrics.addMeteredTableValue(_tableStreamName,
ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
-
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
1L);
-
- // Sleep for a short time as to avoid filling the logs with
exceptions too quickly
- Uninterruptibles.sleepUninterruptibly(exceptionSleepMillis,
TimeUnit.MILLISECONDS);
- exceptionSleepMillis = Math.min(60000L, exceptionSleepMillis *
2);
- continue;
- }
- if (consumedRow != null) {
- try {
- GenericRow transformedRow =
_recordTransformer.transform(consumedRow);
- // FIXME: handle MULTIPLE_RECORDS_KEY for HLL
- if (transformedRow != null &&
IngestionUtils.shouldIngestRow(transformedRow)) {
- // we currently do not get ingestion data through
stream-consumer
- notFull = _realtimeSegment.index(transformedRow, null);
- exceptionSleepMillis = 50L;
- }
- } catch (Exception e) {
- _segmentLogger.warn("Caught exception while indexing row,
sleeping for {} ms, row contents {}",
- exceptionSleepMillis, consumedRow, e);
- numRowsErrored++;
-
- // Sleep for a short time as to avoid filling the logs with
exceptions too quickly
- Uninterruptibles.sleepUninterruptibly(exceptionSleepMillis,
TimeUnit.MILLISECONDS);
- exceptionSleepMillis = Math.min(60000L, exceptionSleepMillis *
2);
- }
- }
- } catch (Error e) {
- _segmentLogger.error("Caught error in indexing thread", e);
- throw e;
- }
- } while (notFull && _keepIndexing && (!_isShuttingDown));
-
- if (_isShuttingDown) {
- _segmentLogger.info("Shutting down indexing thread!");
- return;
- }
- try {
- if (numRowsErrored > 0) {
- serverMetrics.addMeteredTableValue(_tableStreamName,
ServerMeter.ROWS_WITH_ERRORS, numRowsErrored);
- }
- _segmentLogger.info("Indexing threshold reached, proceeding with
index conversion");
- // kill the timer first
- _segmentStatusTask.cancel();
- updateCurrentDocumentCountMetrics();
- _segmentLogger.info("Indexed {} raw events",
_realtimeSegment.getNumDocsIndexed());
- File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" +
System.currentTimeMillis());
- ColumnIndicesForRealtimeTable columnIndicesForRealtimeTable =
- new ColumnIndicesForRealtimeTable(_sortedColumn,
_invertedIndexColumns, Collections.emptyList(),
- Collections.emptyList(), new
ArrayList<>(indexLoadingConfig.getNoDictionaryColumns()),
- new
ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns()));
- // lets convert the segment now
- RealtimeSegmentConverter converter =
- new RealtimeSegmentConverter(_realtimeSegment, null,
tempSegmentFolder.getAbsolutePath(),
- schema, _tableNameWithType, tableConfig,
segmentZKMetadata.getSegmentName(),
- columnIndicesForRealtimeTable,
indexingConfig.isNullHandlingEnabled());
-
- _segmentLogger.info("Trying to build segment");
- final long buildStartTime = System.nanoTime();
- converter.build(_segmentVersion, serverMetrics);
- final long buildEndTime = System.nanoTime();
- _segmentLogger.info("Built segment in {} ms",
- TimeUnit.MILLISECONDS.convert((buildEndTime - buildStartTime),
TimeUnit.NANOSECONDS));
- File destDir = new File(resourceDataDir,
segmentZKMetadata.getSegmentName());
- FileUtils.deleteQuietly(destDir);
- FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], destDir);
-
- FileUtils.deleteQuietly(tempSegmentFolder);
- long segStartTime = _realtimeSegment.getMinTime();
- long segEndTime = _realtimeSegment.getMaxTime();
-
- _segmentLogger.info("Committing {} offsets",
_streamConfig.getType());
- boolean commitSuccessful = false;
- try {
- _streamLevelConsumer.commit();
- commitSuccessful = true;
- _streamLevelConsumer.shutdown();
- _segmentLogger
- .info("Successfully committed {} offsets, consumer release
requested.", _streamConfig.getType());
- serverMetrics.addMeteredTableValue(_tableStreamName,
ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
-
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
- } catch (Throwable e) {
- // If we got here, it means that either the commit or the shutdown
failed. Considering that the
- // KafkaConsumerManager delays shutdown and only adds the consumer
to be released in a deferred way, this
- // likely means that writing the Kafka offsets failed.
- //
- // The old logic (mark segment as done, then commit offsets and
shutdown the consumer immediately) would die
- // in a terrible way, leaving the consumer open and causing us to
only get half the records from that point
- // on. In this case, because we keep the consumer open for a
little while, we should be okay if the
- // controller reassigns us a new segment before the consumer gets
released. Hopefully by the next time that
- // we get to committing the offsets, the transient ZK failure that
caused the write to fail will not
- // happen again and everything will be good.
- //
- // Several things can happen:
- // - The controller reassigns us a new segment before we release
the consumer (KafkaConsumerManager will
- // keep the consumer open for about a minute, which should be
enough time for the controller to reassign
- // us a new segment) and the next time we close the segment the
offsets commit successfully; we're good.
- // - The controller reassigns us a new segment, but after we
released the consumer (if the controller was
- // down or there was a ZK failure on writing the Kafka offsets
but not the Helix state). We lose whatever
- // data was in this segment. Not good.
- // - The server crashes after this comment and before we mark the
current segment as done; if the Kafka
- // offsets didn't get written, then when the server restarts
it'll start consuming the current segment
- // from the previously committed offsets; we're good.
- // - The server crashes after this comment, the Kafka offsets were
written but the segment wasn't marked as
- // done in Helix, but we got a failure (or not) on the commit;
we lose whatever data was in this segment
- // if we restart the server (not good). If we manually mark the
segment as done in Helix by editing the
- // state in ZK, everything is good, we'll consume a new segment
that starts from the correct offsets.
- //
- // This is still better than the previous logic, which would have
these failure modes:
- // - Consumer was left open and the controller reassigned us a new
segment; consume only half the events
- // (because there are two consumers and Kafka will try to
rebalance partitions between those two)
- // - We got a segment assigned to us before we got around to
committing the offsets, reconsume the data that
- // we got in this segment again, as we're starting consumption
from the previously committed offset (eg.
- // duplicate data).
- //
- // This is still not very satisfactory, which is why this part is
due for a redesign.
- //
- // Assuming you got here because the realtime offset commit metric
has fired, check the logs to determine
- // which of the above scenarios happened. If you're in one of the
good scenarios, then there's nothing to
- // do. If you're not, then based on how critical it is to get
those rows back, then your options are:
- // - Wipe the realtime table and reconsume everything (mark the
replica as disabled so that clients don't
- // see query results from partially consumed data, then
re-enable it when this replica has caught up)
- // - Accept that those rows are gone in this replica and move on
(they'll be replaced by good offline data
- // soon anyway)
- // - If there's a replica that has consumed properly, you could
shut it down, copy its segments onto this
- // replica, assign a new consumer group id to this replica,
rename the copied segments and edit their
- // metadata to reflect the new consumer group id, copy the Kafka
offsets from the shutdown replica onto
- // the new consumer group id and then restart both replicas.
This should get you the missing rows.
-
- _segmentLogger
- .error("FATAL: Exception committing or shutting down consumer
commitSuccessful={}", commitSuccessful,
- e);
- serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L);
- if (!commitSuccessful) {
- _streamLevelConsumer.shutdown();
- }
- }
-
- try {
- _segmentLogger.info("Marking current segment as completed in
Helix");
- SegmentZKMetadata metadataToOverwrite = new
SegmentZKMetadata(segmentZKMetadata.getSegmentName());
- metadataToOverwrite.setStatus(Status.DONE);
- metadataToOverwrite.setStartTime(segStartTime);
- metadataToOverwrite.setEndTime(segEndTime);
- metadataToOverwrite.setTimeUnit(_timeType);
-
metadataToOverwrite.setTotalDocs(_realtimeSegment.getNumDocsIndexed());
- _notifier.replaceHLSegment(metadataToOverwrite,
indexLoadingConfig);
- _segmentLogger
- .info("Completed write of segment completion to Helix, waiting
for controller to assign a new segment");
- } catch (Exception e) {
- if (commitSuccessful) {
- _segmentLogger.error(
- "Offsets were committed to Kafka but we were unable to mark
this segment as completed in Helix. "
- + "Manually mark the segment as completed in Helix;
restarting this instance will result in "
- + "data loss.", e);
- } else {
- _segmentLogger.warn(
- "Caught exception while marking segment as completed in
Helix. Offsets were not written, restarting"
- + " the instance should be safe.", e);
- }
- }
- } catch (Exception e) {
- _segmentLogger.error("Caught exception in the realtime indexing
thread", e);
- }
- }
- });
-
- _indexingThread.start();
- serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1L);
- _segmentLogger.debug("scheduling keepIndexing timer check");
- // start a schedule timer to keep track of the segment
- TimerService.TIMER.schedule(_segmentStatusTask, ONE_MINUTE_IN_MILLSEC,
ONE_MINUTE_IN_MILLSEC);
- _segmentLogger.info("finished scheduling keepIndexing timer check");
- }
-
- @Override
- public MutableSegment getSegment() {
- return _realtimeSegment;
- }
-
- @Override
- public Map<String, String> getPartitionToCurrentOffset() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void startConsumption() {
- // no-op
- }
-
- @Override
- public ConsumerState getConsumerState() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getLastConsumedTimestamp() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, PartitionLagState> getPartitionToLagState(
- Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getSegmentName() {
- return _segmentName;
- }
-
- private void computeKeepIndexing() {
- if (_keepIndexing) {
- _segmentLogger.debug("Current indexed {} raw events",
_realtimeSegment.getNumDocsIndexed());
- if ((System.currentTimeMillis() >= _segmentEndTimeThreshold)
- || _realtimeSegment.getNumDocsIndexed() >=
_streamConfig.getFlushThresholdRows()) {
- if (_realtimeSegment.getNumDocsIndexed() == 0) {
- _segmentLogger.info("no new events coming in, extending the end time
by another hour");
- _segmentEndTimeThreshold = System.currentTimeMillis() +
_streamConfig.getFlushThresholdTimeMillis();
- return;
- }
- _segmentLogger.info(
- "Stopped indexing due to reaching segment limit: {} raw documents
indexed, segment is aged {} minutes",
- _realtimeSegment.getNumDocsIndexed(), ((System.currentTimeMillis()
- _start) / (ONE_MINUTE_IN_MILLSEC)));
- _keepIndexing = false;
- }
- }
- updateCurrentDocumentCountMetrics();
- }
-
- private void updateCurrentDocumentCountMetrics() {
- int currentRawDocs = _realtimeSegment.getNumDocsIndexed();
- _serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.DOCUMENT_COUNT,
- (currentRawDocs - _lastUpdatedRawDocuments.get()));
- _lastUpdatedRawDocuments.set(currentRawDocs);
- }
-
- @Override
- protected void doDestroy() {
- LOGGER.info("Trying to shutdown RealtimeSegmentDataManager : {}!",
_segmentName);
- _isShuttingDown = true;
- try {
- _streamLevelConsumer.shutdown();
- } catch (Exception e) {
- LOGGER.error("Failed to shutdown stream consumer!", e);
- }
- _keepIndexing = false;
- _segmentStatusTask.cancel();
- _realtimeSegment.destroy();
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index f431b63eb9..3bf7caf24b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -39,7 +39,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -405,23 +404,17 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
Schema schema = indexLoadingConfig.getSchema();
assert schema != null;
boolean isHLCSegment =
SegmentName.isHighLevelConsumerSegmentName(segmentName);
+ if (isHLCSegment) {
+ throw new UnsupportedOperationException("Adding high level consumer
segment " + segmentName + " is not allowed");
+ }
if (segmentZKMetadata.getStatus().isCompleted()) {
- if (isHLCSegment && !segmentDir.exists()) {
- throw new RuntimeException("Failed to find local copy for committed
HLC segment: " + segmentName);
- }
if (tryLoadExistingSegment(segmentName, indexLoadingConfig,
segmentZKMetadata)) {
// The existing completed segment has been loaded successfully
return;
} else {
- if (!isHLCSegment) {
- // For LLC and uploaded segments, delete the local copy and download
a new copy
- _logger.error("Failed to load LLC segment: {}, downloading a new
copy", segmentName);
- FileUtils.deleteQuietly(segmentDir);
- } else {
- // For HLC segments, throw out the exception because there is no way
to recover (controller does not have a
- // copy of the segment)
- throw new RuntimeException("Failed to load local HLC segment: " +
segmentName);
- }
+ // For LLC and uploaded segments, delete the local copy and download a
new copy
+ _logger.error("Failed to load LLC segment: {}, downloading a new
copy", segmentName);
+ FileUtils.deleteQuietly(segmentDir);
}
// Local segment doesn't exist or cannot load, download a new copy
downloadAndReplaceSegment(segmentName, segmentZKMetadata,
indexLoadingConfig, tableConfig);
@@ -436,34 +429,26 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_logger.error("Not adding segment {}", segmentName);
throw new RuntimeException("Mismatching schema/table config for " +
_tableNameWithType);
}
+
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema,
segmentName);
setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata);
- if (!isHLCSegment) {
- // Generates only one semaphore for every partitionGroupId
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- int partitionGroupId = llcSegmentName.getPartitionGroupId();
- Semaphore semaphore =
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new
Semaphore(1));
- PartitionUpsertMetadataManager partitionUpsertMetadataManager =
- _tableUpsertMetadataManager != null ?
_tableUpsertMetadataManager.getOrCreatePartitionManager(
- partitionGroupId) : null;
- PartitionDedupMetadataManager partitionDedupMetadataManager =
- _tableDedupMetadataManager != null ?
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
- : null;
- LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
- new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
this, _indexDir.getAbsolutePath(),
- indexLoadingConfig, schema, llcSegmentName, semaphore,
_serverMetrics, partitionUpsertMetadataManager,
- partitionDedupMetadataManager, _isTableReadyToConsumeData);
- llRealtimeSegmentDataManager.startConsumption();
- segmentDataManager = llRealtimeSegmentDataManager;
- } else {
- InstanceZKMetadata instanceZKMetadata =
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId);
- HLRealtimeSegmentDataManager hlRealtimeSegmentDataManager = new
HLRealtimeSegmentDataManager(segmentZKMetadata,
- tableConfig, instanceZKMetadata, this,
_indexDir.getAbsolutePath(),
- indexLoadingConfig, schema, _serverMetrics);
- hlRealtimeSegmentDataManager.startConsumption();
- segmentDataManager = hlRealtimeSegmentDataManager;
- }
+ // Generates only one semaphore for every partitionGroupId
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ int partitionGroupId = llcSegmentName.getPartitionGroupId();
+ Semaphore semaphore =
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new
Semaphore(1));
+ PartitionUpsertMetadataManager partitionUpsertMetadataManager =
+ _tableUpsertMetadataManager != null ?
_tableUpsertMetadataManager.getOrCreatePartitionManager(
+ partitionGroupId) : null;
+ PartitionDedupMetadataManager partitionDedupMetadataManager =
+ _tableDedupMetadataManager != null ?
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
+ : null;
+ LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
+ new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this,
_indexDir.getAbsolutePath(),
+ indexLoadingConfig, schema, llcSegmentName, semaphore,
_serverMetrics, partitionUpsertMetadataManager,
+ partitionDedupMetadataManager, _isTableReadyToConsumeData);
+ llRealtimeSegmentDataManager.startConsumption();
+ segmentDataManager = llRealtimeSegmentDataManager;
_logger.info("Initialized RealtimeSegmentDataManager - " + segmentName);
registerSegment(segmentName, segmentDataManager);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index fbcc68f56c..008d0dd900 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.realtime.impl.fakestream;
-import java.util.Set;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -31,7 +30,6 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -50,12 +48,6 @@ public class FakeStreamConsumerFactory extends
StreamConsumerFactory {
return new FakePartitionLevelConsumer(partition, _streamConfig,
FakeStreamConfigUtils.MESSAGE_BATCH_SIZE);
}
- @Override
- public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Set<String> fieldsToRead,
- String groupId) {
- throw new UnsupportedOperationException("Pinot no longer support stream
level consumers!");
- }
-
@Override
public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
return new FakeStreamMetadataProvider(_streamConfig);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index e0d1015b65..9679a175dd 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -18,10 +18,8 @@
*/
package org.apache.pinot.plugin.stream.kafka20;
-import java.util.Set;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -32,12 +30,6 @@ public class KafkaConsumerFactory extends
StreamConsumerFactory {
return new KafkaPartitionLevelConsumer(clientId, _streamConfig, partition);
}
- @Override
- public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Set<String> fieldsToRead,
- String groupId) {
- return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig,
fieldsToRead, groupId);
- }
-
@Override
public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
return new KafkaStreamMetadataProvider(clientId, _streamConfig, partition);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
deleted file mode 100644
index 81df76f628..0000000000
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kafka20;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamDecoderProvider;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * An implementation of a {@link StreamLevelConsumer} which consumes from the
kafka stream
- */
-// Pinot no longer support high level consumer model since v0.12.*
-@Deprecated
-public class KafkaStreamLevelConsumer implements StreamLevelConsumer {
-
- private final StreamMessageDecoder _messageDecoder;
- private final Logger _instanceLogger;
-
- private final StreamConfig _streamConfig;
- private final KafkaStreamLevelStreamConfig _kafkaStreamLevelStreamConfig;
-
- private KafkaConsumer<Bytes, Bytes> _consumer;
- private ConsumerRecords<Bytes, Bytes> _consumerRecords;
- private Iterator<ConsumerRecord<Bytes, Bytes>> _kafkaIterator;
- private Map<Integer, Long> _consumerOffsets = new HashMap<>(); // tracking
current consumed records offsets.
-
- private long _lastLogTime = 0;
- private long _lastCount = 0;
- private long _currentCount = 0L;
-
- public KafkaStreamLevelConsumer(String clientId, String tableName,
StreamConfig streamConfig,
- Set<String> sourceFields, String groupId) {
- _streamConfig = streamConfig;
- _kafkaStreamLevelStreamConfig = new
KafkaStreamLevelStreamConfig(streamConfig, tableName, groupId);
-
- _messageDecoder = StreamDecoderProvider.create(streamConfig, sourceFields);
- _instanceLogger = LoggerFactory
- .getLogger(KafkaStreamLevelConsumer.class.getName() + "_" + tableName
+ "_" + streamConfig.getTopicName());
- _instanceLogger.info("KafkaStreamLevelConsumer: streamConfig : {}",
_streamConfig);
- }
-
- @Override
- public void start()
- throws Exception {
- _consumer =
KafkaStreamLevelConsumerManager.acquireKafkaConsumerForConfig(_kafkaStreamLevelStreamConfig);
- }
-
- private void updateKafkaIterator() {
- _consumerRecords =
_consumer.poll(Duration.ofMillis(_streamConfig.getFetchTimeoutMillis()));
- _kafkaIterator = _consumerRecords.iterator();
- }
-
- private void resetOffsets() {
- for (int partition : _consumerOffsets.keySet()) {
- long offsetToSeek = _consumerOffsets.get(partition);
- _consumer.seek(new TopicPartition(_streamConfig.getTopicName(),
partition), offsetToSeek);
- }
- }
-
- @Override
- public GenericRow next(GenericRow destination) {
- if (_kafkaIterator == null || !_kafkaIterator.hasNext()) {
- updateKafkaIterator();
- }
- if (_kafkaIterator.hasNext()) {
- try {
- final ConsumerRecord<Bytes, Bytes> record = _kafkaIterator.next();
- updateOffsets(record.partition(), record.offset());
- destination = _messageDecoder.decode(record.value().get(),
destination);
-
- _currentCount++;
-
- final long now = System.currentTimeMillis();
- // Log every minute or 100k events
- if (now - _lastLogTime > 60000 || _currentCount - _lastCount >=
100000) {
- if (_lastCount == 0) {
- _instanceLogger
- .info("Consumed {} events from kafka stream {}",
_currentCount, _streamConfig.getTopicName());
- } else {
- _instanceLogger.info("Consumed {} events from kafka stream {}
(rate:{}/s)", _currentCount - _lastCount,
- _streamConfig.getTopicName(), (float) (_currentCount -
_lastCount) * 1000 / (now - _lastLogTime));
- }
- _lastCount = _currentCount;
- _lastLogTime = now;
- }
- return destination;
- } catch (Exception e) {
- _instanceLogger.warn("Caught exception while consuming events", e);
- throw e;
- }
- }
- return null;
- }
-
- private void updateOffsets(int partition, long offset) {
- _consumerOffsets.put(partition, offset + 1);
- }
-
- @Override
- public void commit() {
- _consumer.commitSync(getOffsetsMap());
- // Since the lastest batch may not be consumed fully, so we need to reset
kafka consumer's offset.
- resetOffsets();
- _consumerOffsets.clear();
- }
-
- private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
- Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
- for (Integer partition : _consumerOffsets.keySet()) {
- offsetsMap.put(new TopicPartition(_streamConfig.getTopicName(),
partition),
- new OffsetAndMetadata(_consumerOffsets.get(partition)));
- }
- return offsetsMap;
- }
-
- @Override
- public void shutdown()
- throws Exception {
- if (_consumer != null) {
- // If offsets commit is not succeed, then reset the offsets here.
- resetOffsets();
- KafkaStreamLevelConsumerManager.releaseKafkaConsumer(_consumer);
- _consumer = null;
- }
- }
-}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
deleted file mode 100644
index c6fc50b231..0000000000
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kafka20;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Manager for Kafka consumers that reuses consumers and delays their shutdown.
- *
- * This is a workaround for the current realtime design flaw where any issue
while flushing/committing offsets causes
- * duplicate or dropped events. Kafka consumption is driven by the controller,
which assigns a realtime segment to the
- * servers; when a server is assigned a new realtime segment, it creates a
Kafka consumer, consumes until it reaches a
- * threshold then flushes to disk, writes metadata to helix indicating the
segment is completed, commits Kafka offsets
- * to ZK and then shuts down the consumer. The controller notices the metadata
write and reassigns a segment to the
- * server, so that it can keep on consuming.
- *
- * This logic is flawed if committing Kafka offsets fails, at which time the
committed state is unknown. The proper fix
- * would be to just keep on using that consumer and try committing our offsets
later, but we recreate a new Kafka
- * consumer whenever we get a new segment and also keep the old consumer
around, leading to half the events being
- * assigned, due to Kafka rebalancing the partitions between the two consumers
(one of which is not actually reading
- * anything anymore). Because that logic is stateless and driven by Helix,
there's no real clean way to keep the
- * consumer alive and pass it to the next segment.
- *
- * This class and long comment is to work around this issue by keeping the
consumer alive for a little bit instead of
- * shutting it down immediately, so that the next segment assignment can pick
up the same consumer. This way, even if
- * committing the offsets fails, we can still pick up the same consumer the
next time we get a segment assigned to us
- * by the controller and hopefully commit our offsets the next time we flush
to disk.
- *
- * This temporary code should be completely removed by the time we redesign
the consumption to use the lower level
- * Kafka APIs.
- */
-public class KafkaStreamLevelConsumerManager {
- private KafkaStreamLevelConsumerManager() {
- }
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaStreamLevelConsumerManager.class);
- private static final Long IN_USE = -1L;
- private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS =
TimeUnit.SECONDS.toMillis(60); // One minute
- private static final Map<ImmutableTriple<String, String, String>,
KafkaConsumer> CONSUMER_FOR_CONFIG_KEY =
- new HashMap<>();
- private static final IdentityHashMap<KafkaConsumer, Long>
CONSUMER_RELEASE_TIME = new IdentityHashMap<>();
-
- public static KafkaConsumer
acquireKafkaConsumerForConfig(KafkaStreamLevelStreamConfig
kafkaStreamLevelStreamConfig) {
- final ImmutableTriple<String, String, String> configKey =
- new ImmutableTriple<>(kafkaStreamLevelStreamConfig.getKafkaTopicName(),
- kafkaStreamLevelStreamConfig.getGroupId(),
- kafkaStreamLevelStreamConfig.getBootstrapServers());
-
- synchronized (KafkaStreamLevelConsumerManager.class) {
- // If we have the consumer and it's not already acquired, return it,
otherwise error out if it's already acquired
- if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) {
- KafkaConsumer kafkaConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey);
- if (CONSUMER_RELEASE_TIME.get(kafkaConsumer).equals(IN_USE)) {
- throw new RuntimeException("Consumer " + kafkaConsumer + " already
in use!");
- } else {
- LOGGER.info("Reusing kafka consumer with id {}", kafkaConsumer);
- CONSUMER_RELEASE_TIME.put(kafkaConsumer, IN_USE);
- return kafkaConsumer;
- }
- }
-
- LOGGER.info("Creating new kafka consumer and iterator for topic {}",
- kafkaStreamLevelStreamConfig.getKafkaTopicName());
-
- // Create the consumer
-
- Properties consumerProp = new Properties();
-
consumerProp.putAll(kafkaStreamLevelStreamConfig.getKafkaConsumerProperties());
- consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaStreamLevelStreamConfig.getBootstrapServers());
- consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class.getName());
-
- if (consumerProp.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) &&
consumerProp
-
.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals("smallest")) {
- consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- }
- KafkaConsumer consumer = new KafkaConsumer<>(consumerProp);
-
consumer.subscribe(Collections.singletonList(kafkaStreamLevelStreamConfig.getKafkaTopicName()));
-
- // Mark both the consumer and iterator as acquired
- CONSUMER_FOR_CONFIG_KEY.put(configKey, consumer);
- CONSUMER_RELEASE_TIME.put(consumer, IN_USE);
-
- LOGGER
- .info("Created consumer with id {} for topic {}", consumer,
kafkaStreamLevelStreamConfig.getKafkaTopicName());
-
- return consumer;
- }
- }
-
- public static void releaseKafkaConsumer(final KafkaConsumer kafkaConsumer) {
- synchronized (KafkaStreamLevelConsumerManager.class) {
- // Release the consumer, mark it for shutdown in the future
- final long releaseTime = System.currentTimeMillis() +
CONSUMER_SHUTDOWN_DELAY_MILLIS;
- CONSUMER_RELEASE_TIME.put(kafkaConsumer, releaseTime);
-
- LOGGER.info("Marking consumer with id {} for release at {}",
kafkaConsumer, releaseTime);
-
- // Schedule the shutdown of the consumer
- new Thread() {
- @Override
- public void run() {
- try {
- // Await the shutdown time
-
Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS,
TimeUnit.MILLISECONDS);
-
- // Shutdown all consumers that have not been re-acquired
- synchronized (KafkaStreamLevelConsumerManager.class) {
- LOGGER.info("Executing release check for consumer {} at {},
scheduled at {}", kafkaConsumer,
- System.currentTimeMillis(), releaseTime);
-
- Iterator<Map.Entry<ImmutableTriple<String, String, String>,
KafkaConsumer>> configIterator =
- CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
-
- while (configIterator.hasNext()) {
- Map.Entry<ImmutableTriple<String, String, String>,
KafkaConsumer> entry = configIterator.next();
- KafkaConsumer kafkaConsumer = entry.getValue();
-
- final Long releaseTime =
CONSUMER_RELEASE_TIME.get(kafkaConsumer);
- if (!releaseTime.equals(IN_USE) && releaseTime <
System.currentTimeMillis()) {
- LOGGER.info("Releasing consumer {}", kafkaConsumer);
-
- try {
- kafkaConsumer.close();
- } catch (Exception e) {
- LOGGER.warn("Caught exception while shutting down Kafka
consumer with id {}", kafkaConsumer, e);
- }
-
- configIterator.remove();
- CONSUMER_RELEASE_TIME.remove(kafkaConsumer);
- } else {
- LOGGER.info("Not releasing consumer {}, it has been
reacquired", kafkaConsumer);
- }
- }
- }
- } catch (Exception e) {
- LOGGER.warn("Caught exception in release of consumer {}",
kafkaConsumer, e);
- }
- }
- }.start();
- }
- }
-
- public static void closeAllConsumers() {
- try {
- // Shutdown all consumers
- synchronized (KafkaStreamLevelConsumerManager.class) {
- LOGGER.info("Trying to shutdown all the kafka consumers");
- Iterator<KafkaConsumer> consumerIterator =
CONSUMER_FOR_CONFIG_KEY.values().iterator();
-
- while (consumerIterator.hasNext()) {
- KafkaConsumer kafkaConsumer = consumerIterator.next();
- LOGGER.info("Trying to shutdown consumer {}", kafkaConsumer);
- try {
- kafkaConsumer.close();
- } catch (Exception e) {
- LOGGER.warn("Caught exception while shutting down Kafka consumer
with id {}", kafkaConsumer, e);
- }
- consumerIterator.remove();
- }
- CONSUMER_FOR_CONFIG_KEY.clear();
- CONSUMER_RELEASE_TIME.clear();
- }
- } catch (Exception e) {
- LOGGER.warn("Caught exception during shutting down all kafka consumers",
e);
- }
- }
-}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelStreamConfig.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelStreamConfig.java
deleted file mode 100644
index 536121d51a..0000000000
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelStreamConfig.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kafka20;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
-import org.apache.pinot.spi.utils.EqualityUtils;
-
-
-/**
- * Wrapper around {@link StreamConfig} for use in the {@link
KafkaStreamLevelConsumer}
- */
-public class KafkaStreamLevelStreamConfig {
- private static final String DEFAULT_AUTO_COMMIT_ENABLE = "false";
- private static final Map<String, String> DEFAULT_PROPS = new HashMap<String,
String>() {{
- put(KafkaStreamConfigProperties.HighLevelConsumer.AUTO_COMMIT_ENABLE,
DEFAULT_AUTO_COMMIT_ENABLE);
- }};
-
- private String _kafkaTopicName;
- private String _groupId;
- private String _bootstrapServers;
- private Map<String, String> _kafkaConsumerProperties;
-
- /**
- * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level
consumer specific configs
- * @param streamConfig
- * @param tableName
- * @param groupId
- */
- public KafkaStreamLevelStreamConfig(StreamConfig streamConfig, String
tableName, String groupId) {
- Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
-
- _kafkaTopicName = streamConfig.getTopicName();
- String hlcBootstrapBrokerUrlKey =
- KafkaStreamConfigProperties
-
.constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER);
- _bootstrapServers = streamConfigMap.get(hlcBootstrapBrokerUrlKey);
- Preconditions.checkNotNull(_bootstrapServers,
- "Must specify bootstrap broker connect string " +
hlcBootstrapBrokerUrlKey + " in high level kafka consumer");
- _groupId = groupId;
-
- _kafkaConsumerProperties = new HashMap<>();
- String kafkaConsumerPropertyPrefix =
-
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.KAFKA_CONSUMER_PROP_PREFIX);
- for (String key : streamConfigMap.keySet()) {
- if (key.startsWith(kafkaConsumerPropertyPrefix)) {
- _kafkaConsumerProperties
- .put(StreamConfigProperties.getPropertySuffix(key,
kafkaConsumerPropertyPrefix), streamConfigMap.get(key));
- }
- }
- }
-
- public String getKafkaTopicName() {
- return _kafkaTopicName;
- }
-
- public String getGroupId() {
- return _groupId;
- }
-
- public Properties getKafkaConsumerProperties() {
- Properties props = new Properties();
- for (String key : DEFAULT_PROPS.keySet()) {
- props.put(key, DEFAULT_PROPS.get(key));
- }
- for (String key : _kafkaConsumerProperties.keySet()) {
- props.put(key, _kafkaConsumerProperties.get(key));
- }
- props.put("group.id", _groupId);
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers);
- return props;
- }
-
- @Override
- public String toString() {
- return "KafkaStreamLevelStreamConfig{" + "_kafkaTopicName='" +
_kafkaTopicName + '\'' + ", _groupId='" + _groupId
- + '\'' + ", _bootstrapServers='"
- + _bootstrapServers + '\'' + ", _kafkaConsumerProperties=" +
_kafkaConsumerProperties + '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (EqualityUtils.isSameReference(this, o)) {
- return true;
- }
-
- if (EqualityUtils.isNullOrNotSameClass(this, o)) {
- return false;
- }
-
- KafkaStreamLevelStreamConfig that = (KafkaStreamLevelStreamConfig) o;
-
- return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) &&
EqualityUtils
- .isEqual(_groupId, that._groupId) && EqualityUtils
- .isEqual(_bootstrapServers, that._bootstrapServers) && EqualityUtils
- .isEqual(_kafkaConsumerProperties, that._kafkaConsumerProperties);
- }
-
- @Override
- public int hashCode() {
- int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
- result = EqualityUtils.hashCodeOf(result, _groupId);
- result = EqualityUtils.hashCodeOf(result, _bootstrapServers);
- result = EqualityUtils.hashCodeOf(result, _kafkaConsumerProperties);
- return result;
- }
-
- public String getBootstrapServers() {
- return _bootstrapServers;
- }
-}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index fc7f99cf06..6f81632c8c 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -18,12 +18,10 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
-import java.util.Set;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
@@ -37,13 +35,6 @@ public class KinesisConsumerFactory extends
StreamConsumerFactory {
public PartitionLevelConsumer createPartitionLevelConsumer(String clientId,
int partition) {
throw new UnsupportedOperationException();
}
-
- @Override
- public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Set<String> fieldsToRead,
- String groupId) {
- throw new UnsupportedOperationException();
- }
-
@Override
public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
return new KinesisStreamMetadataProvider(clientId, _streamConfig);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
index 9e1e68c199..6614bd321d 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
@@ -18,12 +18,10 @@
*/
package org.apache.pinot.plugin.stream.pulsar;
-import java.util.Set;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
@@ -37,12 +35,6 @@ public class PulsarConsumerFactory extends
StreamConsumerFactory {
throw new UnsupportedOperationException("Partition Level consumer is
deprecated!");
}
- @Override
- public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Set<String> fieldsToRead,
- String groupId) {
- return new PulsarStreamLevelConsumer(clientId, tableName, _streamConfig,
fieldsToRead, groupId);
- }
-
@Override
public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
return new PulsarStreamMetadataProvider(clientId, _streamConfig,
partition);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
deleted file mode 100644
index 78835f492c..0000000000
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.pulsar;
-
-import java.util.Set;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamDecoderProvider;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Reader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A {@link StreamLevelConsumer} implementation for the Pulsar stream
- */
-// Pinot no longer support high level consumer model since v0.12.*
-@Deprecated
-public class PulsarStreamLevelConsumer implements StreamLevelConsumer {
- private Logger _logger;
-
- private StreamMessageDecoder _messageDecoder;
-
- private StreamConfig _streamConfig;
- private PulsarConfig _pulsarStreamLevelStreamConfig;
-
- private Reader<byte[]> _reader;
-
- private long _lastLogTime = 0;
- private long _lastCount = 0;
- private long _currentCount = 0L;
-
- public PulsarStreamLevelConsumer(String clientId, String tableName,
StreamConfig streamConfig,
- Set<String> sourceFields, String subscriberId) {
- _streamConfig = streamConfig;
- _pulsarStreamLevelStreamConfig = new PulsarConfig(streamConfig,
subscriberId);
-
- _messageDecoder = StreamDecoderProvider.create(streamConfig, sourceFields);
-
- _logger =
- LoggerFactory.getLogger(PulsarConfig.class.getName() + "_" + tableName
+ "_" + streamConfig.getTopicName());
- _logger.info("PulsarStreamLevelConsumer: streamConfig : {}",
_streamConfig);
- }
-
- @Override
- public void start()
- throws Exception {
- _reader =
PulsarStreamLevelConsumerManager.acquirePulsarConsumerForConfig(_pulsarStreamLevelStreamConfig);
- }
-
- /**
- * Get next {@link GenericRow} after decoding pulsar {@link Message}
- */
- @Override
- public GenericRow next(GenericRow destination) {
- try {
- if (_reader.hasMessageAvailable()) {
- final Message<byte[]> record = _reader.readNext();
- destination = _messageDecoder.decode(record.getData(), destination);
-
- _currentCount++;
-
- final long now = System.currentTimeMillis();
- // Log every minute or 100k events
- if (now - _lastLogTime > 60000 || _currentCount - _lastCount >=
100000) {
- if (_lastCount == 0) {
- _logger.info("Consumed {} events from pulsar stream {}",
_currentCount, _streamConfig.getTopicName());
- } else {
- _logger.info("Consumed {} events from pulsar stream {}
(rate:{}/s)", _currentCount - _lastCount,
- _streamConfig.getTopicName(), (float) (_currentCount -
_lastCount) * 1000 / (now - _lastLogTime));
- }
- _lastCount = _currentCount;
- _lastLogTime = now;
- }
- return destination;
- }
- } catch (Exception e) {
- _logger.warn("Caught exception while consuming events", e);
- }
- return null;
- }
-
- @Override
- public void commit() {
- }
-
- @Override
- public void shutdown()
- throws Exception {
- if (_reader != null) {
- PulsarStreamLevelConsumerManager.releasePulsarConsumer(_reader);
- _reader = null;
- }
- }
-}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
deleted file mode 100644
index ff23a0cea6..0000000000
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.pulsar;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Implements pulsar high level connection manager.
- */
-public class PulsarStreamLevelConsumerManager {
- private PulsarStreamLevelConsumerManager() {
- }
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarStreamLevelConsumerManager.class);
- private static final Long IN_USE = -1L;
- private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS =
TimeUnit.SECONDS.toMillis(60); // One minute
- private static final Map<ImmutableTriple<String, String, String>,
Reader<byte[]>> CONSUMER_FOR_CONFIG_KEY =
- new HashMap<>();
- private static final IdentityHashMap<Reader<byte[]>, Long>
CONSUMER_RELEASE_TIME = new IdentityHashMap<>();
- protected static PulsarClient _pulsarClient;
- protected static Reader<byte[]> _reader;
-
- /**
- * Get {@link Reader} for {@link PulsarConfig}. If the reader is already
created we return the instance, otherwise
- * a new reader is created.
- */
- public static Reader<byte[]> acquirePulsarConsumerForConfig(PulsarConfig
pulsarStreamLevelStreamConfig) {
- final ImmutableTriple<String, String, String> configKey =
- new
ImmutableTriple<>(pulsarStreamLevelStreamConfig.getPulsarTopicName(),
- pulsarStreamLevelStreamConfig.getSubscriberId(),
pulsarStreamLevelStreamConfig.getBootstrapServers());
-
- synchronized (PulsarStreamLevelConsumerManager.class) {
- // If we have the consumer and it's not already acquired, return it,
otherwise error out if it's already acquired
- if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) {
- Reader<byte[]> pulsarConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey);
- if (CONSUMER_RELEASE_TIME.get(pulsarConsumer).equals(IN_USE)) {
- throw new RuntimeException("Consumer " + pulsarConsumer + " already
in use!");
- } else {
- LOGGER.info("Reusing pulsar consumer with id {}", pulsarConsumer);
- CONSUMER_RELEASE_TIME.put(pulsarConsumer, IN_USE);
- return pulsarConsumer;
- }
- }
-
- LOGGER.info("Creating new pulsar consumer and iterator for topic {}",
- pulsarStreamLevelStreamConfig.getPulsarTopicName());
-
- // Create the consumer
- try {
- ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(
- pulsarStreamLevelStreamConfig.getBootstrapServers());
- if (pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath() != null) {
-
pulsarClientBuilder.tlsTrustCertsFilePath(pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath());
- }
-
- if (pulsarStreamLevelStreamConfig.getAuthenticationToken() != null) {
- Authentication authentication = AuthenticationFactory.token(
- pulsarStreamLevelStreamConfig.getAuthenticationToken());
- pulsarClientBuilder.authentication(authentication);
- }
-
- _pulsarClient = pulsarClientBuilder.build();
-
- _reader =
_pulsarClient.newReader().topic(pulsarStreamLevelStreamConfig.getPulsarTopicName())
-
.startMessageId(pulsarStreamLevelStreamConfig.getInitialMessageId()).create();
-
- // Mark both the consumer and iterator as acquired
- CONSUMER_FOR_CONFIG_KEY.put(configKey, _reader);
- CONSUMER_RELEASE_TIME.put(_reader, IN_USE);
-
- LOGGER.info("Created consumer with id {} for topic {}", _reader,
- pulsarStreamLevelStreamConfig.getPulsarTopicName());
-
- return _reader;
- } catch (PulsarClientException e) {
- LOGGER.error("Could not create pulsar consumer", e);
- return null;
- }
- }
- }
-
- /**
- * remove the {@link Reader} from consumer pool after closing it.
- */
- public static void releasePulsarConsumer(final Reader<byte[]>
pulsarConsumer) {
- synchronized (PulsarStreamLevelConsumerManager.class) {
- // Release the consumer, mark it for shutdown in the future
- final long releaseTime = System.currentTimeMillis() +
CONSUMER_SHUTDOWN_DELAY_MILLIS;
- CONSUMER_RELEASE_TIME.put(pulsarConsumer, releaseTime);
-
- LOGGER.info("Marking consumer with id {} for release at {}",
pulsarConsumer, releaseTime);
-
- // Schedule the shutdown of the consumer
- new Thread() {
- @Override
- public void run() {
- try {
- // Await the shutdown time
-
Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS,
TimeUnit.MILLISECONDS);
-
- // Shutdown all consumers that have not been re-acquired
- synchronized (PulsarStreamLevelConsumerManager.class) {
- LOGGER.info("Executing release check for consumer {} at {},
scheduled at {}", pulsarConsumer,
- System.currentTimeMillis(), releaseTime);
-
- Iterator<Map.Entry<ImmutableTriple<String, String, String>,
Reader<byte[]>>> configIterator =
- CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
-
- while (configIterator.hasNext()) {
- Map.Entry<ImmutableTriple<String, String, String>,
Reader<byte[]>> entry = configIterator.next();
- Reader<byte[]> pulsarConsumer = entry.getValue();
-
- final Long releaseTime =
CONSUMER_RELEASE_TIME.get(pulsarConsumer);
- if (!releaseTime.equals(IN_USE) && releaseTime <
System.currentTimeMillis()) {
- LOGGER.info("Releasing consumer {}", pulsarConsumer);
-
- try {
- pulsarConsumer.close();
- } catch (Exception e) {
- LOGGER.warn("Caught exception while shutting down Pulsar
consumer with id {}", pulsarConsumer, e);
- }
-
- configIterator.remove();
- CONSUMER_RELEASE_TIME.remove(pulsarConsumer);
- } else {
- LOGGER.info("Not releasing consumer {}, it has been
reacquired", pulsarConsumer);
- }
- }
- }
- } catch (Exception e) {
- LOGGER.warn("Caught exception in release of consumer {}",
pulsarConsumer, e);
- }
- }
- }.start();
- }
- }
-
- public static void closeAllConsumers() {
- try {
- // Shutdown all consumers
- synchronized (PulsarStreamLevelConsumerManager.class) {
- LOGGER.info("Trying to shutdown all the pulsar consumers");
- Iterator<Reader<byte[]>> consumerIterator =
CONSUMER_FOR_CONFIG_KEY.values().iterator();
-
- while (consumerIterator.hasNext()) {
- Reader<byte[]> pulsarConsumer = consumerIterator.next();
- LOGGER.info("Trying to shutdown consumer {}", pulsarConsumer);
- try {
- pulsarConsumer.close();
- } catch (Exception e) {
- LOGGER.warn("Caught exception while shutting down Pulsar consumer
with id {}", pulsarConsumer, e);
- }
- consumerIterator.remove();
- }
- CONSUMER_FOR_CONFIG_KEY.clear();
- CONSUMER_RELEASE_TIME.clear();
- }
- } catch (Exception e) {
- LOGGER.warn("Caught exception during shutting down all pulsar
consumers", e);
- }
- }
-}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 4b4d671605..60e04a2431 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -51,8 +51,12 @@ public abstract class StreamConsumerFactory {
* @param groupId consumer group Id
* @return the stream level consumer
*/
- public abstract StreamLevelConsumer createStreamLevelConsumer(String
clientId, String tableName,
- Set<String> fieldsToRead, String groupId);
+ @Deprecated
+ public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName,
+ Set<String> fieldsToRead, String groupId) {
+ throw new UnsupportedOperationException("Apache pinot no longer supports
stream level consumer model. Please "
+ + "create partition level consumer only");
+ }
/**
* Creates a metadata provider which provides partition specific metadata
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]