This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f25f3ed1f2 Cleanup HLC code (#11326)
f25f3ed1f2 is described below

commit f25f3ed1f242513b90207d082fb80dbfdc9b06f6
Author: Navina Ramesh <[email protected]>
AuthorDate: Sun Aug 13 18:34:34 2023 -0700

    Cleanup HLC code (#11326)
---
 .../broker/broker/FakeStreamConsumerFactory.java   |   9 -
 .../common/utils/config/TableConfigUtilsTest.java  |   7 -
 .../realtime/HLRealtimeSegmentDataManager.java     | 489 ---------------------
 .../manager/realtime/RealtimeTableDataManager.java |  61 +--
 .../impl/fakestream/FakeStreamConsumerFactory.java |   8 -
 .../stream/kafka20/KafkaConsumerFactory.java       |   8 -
 .../stream/kafka20/KafkaStreamLevelConsumer.java   | 158 -------
 .../kafka20/KafkaStreamLevelConsumerManager.java   | 199 ---------
 .../kafka20/KafkaStreamLevelStreamConfig.java      | 133 ------
 .../stream/kinesis/KinesisConsumerFactory.java     |   9 -
 .../stream/pulsar/PulsarConsumerFactory.java       |   8 -
 .../stream/pulsar/PulsarStreamLevelConsumer.java   | 114 -----
 .../pulsar/PulsarStreamLevelConsumerManager.java   | 193 --------
 .../pinot/spi/stream/StreamConsumerFactory.java    |   8 +-
 14 files changed, 29 insertions(+), 1375 deletions(-)

diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
index be2f6d8a6a..3ada727a13 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.broker;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
@@ -31,7 +30,6 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
@@ -41,13 +39,6 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
   public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, 
int partition) {
     return new FakePartitionLevelConsumer();
   }
-
-  @Override
-  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Set<String> fieldsToRead,
-      String groupId) {
-    return null;
-  }
-
   @Override
   public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
     return new FakesStreamMetadataProvider();
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
index 9d1f86dc23..edb3b58f98 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigUtilsTest.java
@@ -34,7 +34,6 @@ import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -218,12 +217,6 @@ public class TableConfigUtilsTest {
       return null;
     }
 
-    @Override
-    public StreamLevelConsumer createStreamLevelConsumer(String clientId, 
String tableName, Set<String> fieldsToRead,
-        String groupId) {
-      return null;
-    }
-
     @Override
     public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
       return null;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
deleted file mode 100644
index 246cf83e98..0000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerGauge;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
-import 
org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
-import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
-import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
-import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.utils.IngestionUtils;
-import org.apache.pinot.segment.spi.MutableSegment;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.metrics.PinotMeter;
-import org.apache.pinot.spi.stream.ConsumerPartitionState;
-import org.apache.pinot.spi.stream.PartitionLagState;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
-import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(HLRealtimeSegmentDataManager.class);
-  private final static long ONE_MINUTE_IN_MILLSEC = 1000 * 60;
-
-  private final String _tableNameWithType;
-  private final String _segmentName;
-  private final String _timeColumnName;
-  private final TimeUnit _timeType;
-  private final RecordTransformer _recordTransformer;
-
-  private final StreamLevelConsumer _streamLevelConsumer;
-  private final File _resourceTmpDir;
-  private final MutableSegmentImpl _realtimeSegment;
-  private final String _tableStreamName;
-  private final StreamConfig _streamConfig;
-
-  private final long _start = System.currentTimeMillis();
-  private long _segmentEndTimeThreshold;
-  private AtomicLong _lastUpdatedRawDocuments = new AtomicLong(0);
-
-  private volatile boolean _keepIndexing = true;
-  private volatile boolean _isShuttingDown = false;
-
-  private final TimerTask _segmentStatusTask;
-  private final ServerMetrics _serverMetrics;
-  private final RealtimeTableDataManager _notifier;
-  private Thread _indexingThread;
-
-  private final String _sortedColumn;
-  private final List<String> _invertedIndexColumns;
-  private final Logger _segmentLogger;
-  private final SegmentVersion _segmentVersion;
-
-  private PinotMeter _tableAndStreamRowsConsumed = null;
-  private PinotMeter _tableRowsConsumed = null;
-
-  // An instance of this class exists only for the duration of the realtime 
segment that is currently being consumed.
-  // Once the segment is committed, the segment is handled by 
OfflineSegmentDataManager
-  public HLRealtimeSegmentDataManager(final SegmentZKMetadata 
segmentZKMetadata, final TableConfig tableConfig,
-      InstanceZKMetadata instanceMetadata, final RealtimeTableDataManager 
realtimeTableDataManager,
-      final String resourceDataDir, final IndexLoadingConfig 
indexLoadingConfig, final Schema schema,
-      final ServerMetrics serverMetrics)
-      throws Exception {
-    super();
-    _segmentVersion = indexLoadingConfig.getSegmentVersion();
-    _recordTransformer = 
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
-    _serverMetrics = serverMetrics;
-    _segmentName = segmentZKMetadata.getSegmentName();
-    _tableNameWithType = tableConfig.getTableName();
-    _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
-    Preconditions
-        .checkNotNull(_timeColumnName, "Must provide valid timeColumnName in 
tableConfig for realtime table {}",
-            _tableNameWithType);
-    DateTimeFieldSpec dateTimeFieldSpec = 
schema.getSpecForTimeColumn(_timeColumnName);
-    Preconditions.checkNotNull(dateTimeFieldSpec, "Must provide field spec for 
time column {}", _timeColumnName);
-    _timeType = dateTimeFieldSpec.getFormatSpec().getColumnUnit();
-
-    List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
-    if (sortedColumns.isEmpty()) {
-      LOGGER.info("RealtimeDataResourceZKMetadata contains no information 
about sorted column for segment {}",
-          _segmentName);
-      _sortedColumn = null;
-    } else {
-      String firstSortedColumn = sortedColumns.get(0);
-      if (schema.hasColumn(firstSortedColumn)) {
-        LOGGER.info("Setting sorted column name: {} from 
RealtimeDataResourceZKMetadata for segment {}",
-            firstSortedColumn, _segmentName);
-        _sortedColumn = firstSortedColumn;
-      } else {
-        LOGGER
-            .warn("Sorted column name: {} from RealtimeDataResourceZKMetadata 
is not existed in schema for segment {}.",
-                firstSortedColumn, _segmentName);
-        _sortedColumn = null;
-      }
-    }
-
-    // Inverted index columns
-    // We need to add sorted column into inverted index columns because when 
we convert realtime in memory segment into
-    // offline segment, we use sorted column's inverted index to maintain the 
order of the records so that the records
-    // are sorted on the sorted column.
-    if (_sortedColumn != null) {
-      indexLoadingConfig.addInvertedIndexColumns(_sortedColumn);
-    }
-    Set<String> invertedIndexColumns = 
indexLoadingConfig.getInvertedIndexColumns();
-    _invertedIndexColumns = new ArrayList<>(invertedIndexColumns);
-    _streamConfig = new StreamConfig(_tableNameWithType, 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
-
-    _segmentLogger = LoggerFactory.getLogger(
-        HLRealtimeSegmentDataManager.class.getName() + "_" + _segmentName + 
"_" + _streamConfig.getTopicName());
-    _segmentLogger.info("Created segment data manager with Sorted column:{}, 
invertedIndexColumns:{}", _sortedColumn,
-        invertedIndexColumns);
-
-    _segmentEndTimeThreshold = _start + 
_streamConfig.getFlushThresholdTimeMillis();
-    _resourceTmpDir = new File(resourceDataDir, RESOURCE_TEMP_DIR_NAME);
-    if (!_resourceTmpDir.exists()) {
-      _resourceTmpDir.mkdirs();
-    }
-    // create and init stream level consumer
-    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
-    String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" 
+ _streamConfig.getTopicName();
-    Set<String> fieldsToRead = 
IngestionUtils.getFieldsForRecordExtractor(tableConfig.getIngestionConfig(), 
schema);
-    _streamLevelConsumer = 
streamConsumerFactory.createStreamLevelConsumer(clientId, _tableNameWithType, 
fieldsToRead,
-        instanceMetadata.getGroupId(_tableNameWithType));
-    _streamLevelConsumer.start();
-    _tableStreamName = _tableNameWithType + "_" + _streamConfig.getTopicName();
-
-    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
-    if (indexingConfig != null && indexingConfig.isAggregateMetrics()) {
-      LOGGER.warn("Updating of metrics only supported for LLC consumer, 
ignoring.");
-    }
-
-    // lets create a new realtime segment
-    _segmentLogger.info("Started {} stream provider", _streamConfig.getType());
-    final int capacity = _streamConfig.getFlushThresholdRows();
-    boolean nullHandlingEnabled = indexingConfig != null && 
indexingConfig.isNullHandlingEnabled();
-    RealtimeSegmentConfig realtimeSegmentConfig =
-        new 
RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(_tableNameWithType)
-            .setSegmentName(_segmentName)
-            
.setStreamName(_streamConfig.getTopicName()).setSchema(schema).setTimeColumnName(_timeColumnName)
-            
.setCapacity(capacity).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
-            .setSegmentZKMetadata(segmentZKMetadata)
-            
.setOffHeap(indexLoadingConfig.isRealtimeOffHeapAllocation()).setMemoryManager(
-            getMemoryManager(realtimeTableDataManager.getConsumerDir(), 
_segmentName,
-                indexLoadingConfig.isRealtimeOffHeapAllocation(),
-                indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), 
serverMetrics))
-            .setStatsHistory(realtimeTableDataManager.getStatsHistory())
-            .setNullHandlingEnabled(nullHandlingEnabled).build();
-    _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig, 
serverMetrics);
-
-    _notifier = realtimeTableDataManager;
-
-    LOGGER.info("Starting consumption on realtime consuming segment {} 
maxRowCount {} maxEndTime {}", _segmentName,
-        capacity, new DateTime(_segmentEndTimeThreshold, 
DateTimeZone.UTC).toString());
-    _segmentStatusTask = new TimerTask() {
-      @Override
-      public void run() {
-        computeKeepIndexing();
-      }
-    };
-
-    // start the indexing thread
-    _indexingThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // continue indexing until criteria is met
-        boolean notFull = true;
-        long exceptionSleepMillis = 50L;
-        _segmentLogger.info("Starting to collect rows");
-
-        int numRowsErrored = 0;
-        GenericRow reuse = new GenericRow();
-        do {
-          reuse.clear();
-          try {
-            GenericRow consumedRow;
-            try {
-              consumedRow = _streamLevelConsumer.next(reuse);
-              _tableAndStreamRowsConsumed = serverMetrics
-                  .addMeteredTableValue(_tableStreamName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
-                      _tableAndStreamRowsConsumed);
-              _tableRowsConsumed =
-                  
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, 
_tableRowsConsumed);
-            } catch (Exception e) {
-              _segmentLogger.warn("Caught exception while consuming row, 
sleeping for {} ms", exceptionSleepMillis, e);
-              numRowsErrored++;
-              serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
-              
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
 1L);
-
-              // Sleep for a short time as to avoid filling the logs with 
exceptions too quickly
-              Uninterruptibles.sleepUninterruptibly(exceptionSleepMillis, 
TimeUnit.MILLISECONDS);
-              exceptionSleepMillis = Math.min(60000L, exceptionSleepMillis * 
2);
-              continue;
-            }
-            if (consumedRow != null) {
-              try {
-                GenericRow transformedRow = 
_recordTransformer.transform(consumedRow);
-                // FIXME: handle MULTIPLE_RECORDS_KEY for HLL
-                if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
-                  // we currently do not get ingestion data through 
stream-consumer
-                  notFull = _realtimeSegment.index(transformedRow, null);
-                  exceptionSleepMillis = 50L;
-                }
-              } catch (Exception e) {
-                _segmentLogger.warn("Caught exception while indexing row, 
sleeping for {} ms, row contents {}",
-                    exceptionSleepMillis, consumedRow, e);
-                numRowsErrored++;
-
-                // Sleep for a short time as to avoid filling the logs with 
exceptions too quickly
-                Uninterruptibles.sleepUninterruptibly(exceptionSleepMillis, 
TimeUnit.MILLISECONDS);
-                exceptionSleepMillis = Math.min(60000L, exceptionSleepMillis * 
2);
-              }
-            }
-          } catch (Error e) {
-            _segmentLogger.error("Caught error in indexing thread", e);
-            throw e;
-          }
-        } while (notFull && _keepIndexing && (!_isShuttingDown));
-
-        if (_isShuttingDown) {
-          _segmentLogger.info("Shutting down indexing thread!");
-          return;
-        }
-        try {
-          if (numRowsErrored > 0) {
-            serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.ROWS_WITH_ERRORS, numRowsErrored);
-          }
-          _segmentLogger.info("Indexing threshold reached, proceeding with 
index conversion");
-          // kill the timer first
-          _segmentStatusTask.cancel();
-          updateCurrentDocumentCountMetrics();
-          _segmentLogger.info("Indexed {} raw events", 
_realtimeSegment.getNumDocsIndexed());
-          File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + 
System.currentTimeMillis());
-          ColumnIndicesForRealtimeTable columnIndicesForRealtimeTable =
-              new ColumnIndicesForRealtimeTable(_sortedColumn, 
_invertedIndexColumns, Collections.emptyList(),
-                  Collections.emptyList(), new 
ArrayList<>(indexLoadingConfig.getNoDictionaryColumns()),
-                  new 
ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns()));
-          // lets convert the segment now
-          RealtimeSegmentConverter converter =
-              new RealtimeSegmentConverter(_realtimeSegment, null, 
tempSegmentFolder.getAbsolutePath(),
-                  schema, _tableNameWithType, tableConfig, 
segmentZKMetadata.getSegmentName(),
-                  columnIndicesForRealtimeTable, 
indexingConfig.isNullHandlingEnabled());
-
-          _segmentLogger.info("Trying to build segment");
-          final long buildStartTime = System.nanoTime();
-          converter.build(_segmentVersion, serverMetrics);
-          final long buildEndTime = System.nanoTime();
-          _segmentLogger.info("Built segment in {} ms",
-              TimeUnit.MILLISECONDS.convert((buildEndTime - buildStartTime), 
TimeUnit.NANOSECONDS));
-          File destDir = new File(resourceDataDir, 
segmentZKMetadata.getSegmentName());
-          FileUtils.deleteQuietly(destDir);
-          FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], destDir);
-
-          FileUtils.deleteQuietly(tempSegmentFolder);
-          long segStartTime = _realtimeSegment.getMinTime();
-          long segEndTime = _realtimeSegment.getMaxTime();
-
-          _segmentLogger.info("Committing {} offsets", 
_streamConfig.getType());
-          boolean commitSuccessful = false;
-          try {
-            _streamLevelConsumer.commit();
-            commitSuccessful = true;
-            _streamLevelConsumer.shutdown();
-            _segmentLogger
-                .info("Successfully committed {} offsets, consumer release 
requested.", _streamConfig.getType());
-            serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
-            
serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
-          } catch (Throwable e) {
-            // If we got here, it means that either the commit or the shutdown 
failed. Considering that the
-            // KafkaConsumerManager delays shutdown and only adds the consumer 
to be released in a deferred way, this
-            // likely means that writing the Kafka offsets failed.
-            //
-            // The old logic (mark segment as done, then commit offsets and 
shutdown the consumer immediately) would die
-            // in a terrible way, leaving the consumer open and causing us to 
only get half the records from that point
-            // on. In this case, because we keep the consumer open for a 
little while, we should be okay if the
-            // controller reassigns us a new segment before the consumer gets 
released. Hopefully by the next time that
-            // we get to committing the offsets, the transient ZK failure that 
caused the write to fail will not
-            // happen again and everything will be good.
-            //
-            // Several things can happen:
-            // - The controller reassigns us a new segment before we release 
the consumer (KafkaConsumerManager will
-            //   keep the consumer open for about a minute, which should be 
enough time for the controller to reassign
-            //   us a new segment) and the next time we close the segment the 
offsets commit successfully; we're good.
-            // - The controller reassigns us a new segment, but after we 
released the consumer (if the controller was
-            //   down or there was a ZK failure on writing the Kafka offsets 
but not the Helix state). We lose whatever
-            //   data was in this segment. Not good.
-            // - The server crashes after this comment and before we mark the 
current segment as done; if the Kafka
-            //   offsets didn't get written, then when the server restarts 
it'll start consuming the current segment
-            //   from the previously committed offsets; we're good.
-            // - The server crashes after this comment, the Kafka offsets were 
written but the segment wasn't marked as
-            //   done in Helix, but we got a failure (or not) on the commit; 
we lose whatever data was in this segment
-            //   if we restart the server (not good). If we manually mark the 
segment as done in Helix by editing the
-            //   state in ZK, everything is good, we'll consume a new segment 
that starts from the correct offsets.
-            //
-            // This is still better than the previous logic, which would have 
these failure modes:
-            // - Consumer was left open and the controller reassigned us a new 
segment; consume only half the events
-            //   (because there are two consumers and Kafka will try to 
rebalance partitions between those two)
-            // - We got a segment assigned to us before we got around to 
committing the offsets, reconsume the data that
-            //   we got in this segment again, as we're starting consumption 
from the previously committed offset (eg.
-            //   duplicate data).
-            //
-            // This is still not very satisfactory, which is why this part is 
due for a redesign.
-            //
-            // Assuming you got here because the realtime offset commit metric 
has fired, check the logs to determine
-            // which of the above scenarios happened. If you're in one of the 
good scenarios, then there's nothing to
-            // do. If you're not, then based on how critical it is to get 
those rows back, then your options are:
-            // - Wipe the realtime table and reconsume everything (mark the 
replica as disabled so that clients don't
-            //   see query results from partially consumed data, then 
re-enable it when this replica has caught up)
-            // - Accept that those rows are gone in this replica and move on 
(they'll be replaced by good offline data
-            //   soon anyway)
-            // - If there's a replica that has consumed properly, you could 
shut it down, copy its segments onto this
-            //   replica, assign a new consumer group id to this replica, 
rename the copied segments and edit their
-            //   metadata to reflect the new consumer group id, copy the Kafka 
offsets from the shutdown replica onto
-            //   the new consumer group id and then restart both replicas. 
This should get you the missing rows.
-
-            _segmentLogger
-                .error("FATAL: Exception committing or shutting down consumer 
commitSuccessful={}", commitSuccessful,
-                    e);
-            serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L);
-            if (!commitSuccessful) {
-              _streamLevelConsumer.shutdown();
-            }
-          }
-
-          try {
-            _segmentLogger.info("Marking current segment as completed in 
Helix");
-            SegmentZKMetadata metadataToOverwrite = new 
SegmentZKMetadata(segmentZKMetadata.getSegmentName());
-            metadataToOverwrite.setStatus(Status.DONE);
-            metadataToOverwrite.setStartTime(segStartTime);
-            metadataToOverwrite.setEndTime(segEndTime);
-            metadataToOverwrite.setTimeUnit(_timeType);
-            
metadataToOverwrite.setTotalDocs(_realtimeSegment.getNumDocsIndexed());
-            _notifier.replaceHLSegment(metadataToOverwrite, 
indexLoadingConfig);
-            _segmentLogger
-                .info("Completed write of segment completion to Helix, waiting 
for controller to assign a new segment");
-          } catch (Exception e) {
-            if (commitSuccessful) {
-              _segmentLogger.error(
-                  "Offsets were committed to Kafka but we were unable to mark 
this segment as completed in Helix. "
-                      + "Manually mark the segment as completed in Helix; 
restarting this instance will result in "
-                      + "data loss.", e);
-            } else {
-              _segmentLogger.warn(
-                  "Caught exception while marking segment as completed in 
Helix. Offsets were not written, restarting"
-                      + " the instance should be safe.", e);
-            }
-          }
-        } catch (Exception e) {
-          _segmentLogger.error("Caught exception in the realtime indexing 
thread", e);
-        }
-      }
-    });
-
-    _indexingThread.start();
-    serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
-    _segmentLogger.debug("scheduling keepIndexing timer check");
-    // start a schedule timer to keep track of the segment
-    TimerService.TIMER.schedule(_segmentStatusTask, ONE_MINUTE_IN_MILLSEC, 
ONE_MINUTE_IN_MILLSEC);
-    _segmentLogger.info("finished scheduling keepIndexing timer check");
-  }
-
-  @Override
-  public MutableSegment getSegment() {
-    return _realtimeSegment;
-  }
-
-  @Override
-  public Map<String, String> getPartitionToCurrentOffset() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void startConsumption() {
-    // no-op
-  }
-
-  @Override
-  public ConsumerState getConsumerState() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getLastConsumedTimestamp() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Map<String, PartitionLagState> getPartitionToLagState(
-      Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String getSegmentName() {
-    return _segmentName;
-  }
-
-  private void computeKeepIndexing() {
-    if (_keepIndexing) {
-      _segmentLogger.debug("Current indexed {} raw events", 
_realtimeSegment.getNumDocsIndexed());
-      if ((System.currentTimeMillis() >= _segmentEndTimeThreshold)
-          || _realtimeSegment.getNumDocsIndexed() >= 
_streamConfig.getFlushThresholdRows()) {
-        if (_realtimeSegment.getNumDocsIndexed() == 0) {
-          _segmentLogger.info("no new events coming in, extending the end time 
by another hour");
-          _segmentEndTimeThreshold = System.currentTimeMillis() + 
_streamConfig.getFlushThresholdTimeMillis();
-          return;
-        }
-        _segmentLogger.info(
-            "Stopped indexing due to reaching segment limit: {} raw documents 
indexed, segment is aged {} minutes",
-            _realtimeSegment.getNumDocsIndexed(), ((System.currentTimeMillis() 
- _start) / (ONE_MINUTE_IN_MILLSEC)));
-        _keepIndexing = false;
-      }
-    }
-    updateCurrentDocumentCountMetrics();
-  }
-
-  private void updateCurrentDocumentCountMetrics() {
-    int currentRawDocs = _realtimeSegment.getNumDocsIndexed();
-    _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.DOCUMENT_COUNT,
-        (currentRawDocs - _lastUpdatedRawDocuments.get()));
-    _lastUpdatedRawDocuments.set(currentRawDocs);
-  }
-
-  @Override
-  protected void doDestroy() {
-    LOGGER.info("Trying to shutdown RealtimeSegmentDataManager : {}!", 
_segmentName);
-    _isShuttingDown = true;
-    try {
-      _streamLevelConsumer.shutdown();
-    } catch (Exception e) {
-      LOGGER.error("Failed to shutdown stream consumer!", e);
-    }
-    _keepIndexing = false;
-    _segmentStatusTask.cancel();
-    _realtimeSegment.destroy();
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index f431b63eb9..3bf7caf24b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -39,7 +39,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -405,23 +404,17 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     Schema schema = indexLoadingConfig.getSchema();
     assert schema != null;
     boolean isHLCSegment = 
SegmentName.isHighLevelConsumerSegmentName(segmentName);
+    if (isHLCSegment) {
+      throw new UnsupportedOperationException("Adding high level consumer 
segment " + segmentName + " is not allowed");
+    }
     if (segmentZKMetadata.getStatus().isCompleted()) {
-      if (isHLCSegment && !segmentDir.exists()) {
-        throw new RuntimeException("Failed to find local copy for committed 
HLC segment: " + segmentName);
-      }
       if (tryLoadExistingSegment(segmentName, indexLoadingConfig, 
segmentZKMetadata)) {
         // The existing completed segment has been loaded successfully
         return;
       } else {
-        if (!isHLCSegment) {
-          // For LLC and uploaded segments, delete the local copy and download 
a new copy
-          _logger.error("Failed to load LLC segment: {}, downloading a new 
copy", segmentName);
-          FileUtils.deleteQuietly(segmentDir);
-        } else {
-          // For HLC segments, throw out the exception because there is no way 
to recover (controller does not have a
-          // copy of the segment)
-          throw new RuntimeException("Failed to load local HLC segment: " + 
segmentName);
-        }
+        // For LLC and uploaded segments, delete the local copy and download a 
new copy
+        _logger.error("Failed to load LLC segment: {}, downloading a new 
copy", segmentName);
+        FileUtils.deleteQuietly(segmentDir);
       }
       // Local segment doesn't exist or cannot load, download a new copy
       downloadAndReplaceSegment(segmentName, segmentZKMetadata, 
indexLoadingConfig, tableConfig);
@@ -436,34 +429,26 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       _logger.error("Not adding segment {}", segmentName);
       throw new RuntimeException("Mismatching schema/table config for " + 
_tableNameWithType);
     }
+
     
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, 
segmentName);
     setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata);
 
-    if (!isHLCSegment) {
-      // Generates only one semaphore for every partitionGroupId
-      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
-      int partitionGroupId = llcSegmentName.getPartitionGroupId();
-      Semaphore semaphore = 
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new 
Semaphore(1));
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager =
-          _tableUpsertMetadataManager != null ? 
_tableUpsertMetadataManager.getOrCreatePartitionManager(
-              partitionGroupId) : null;
-      PartitionDedupMetadataManager partitionDedupMetadataManager =
-          _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
-              : null;
-      LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
-          new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
this, _indexDir.getAbsolutePath(),
-              indexLoadingConfig, schema, llcSegmentName, semaphore, 
_serverMetrics, partitionUpsertMetadataManager,
-              partitionDedupMetadataManager, _isTableReadyToConsumeData);
-      llRealtimeSegmentDataManager.startConsumption();
-      segmentDataManager = llRealtimeSegmentDataManager;
-    } else {
-      InstanceZKMetadata instanceZKMetadata = 
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId);
-      HLRealtimeSegmentDataManager hlRealtimeSegmentDataManager = new 
HLRealtimeSegmentDataManager(segmentZKMetadata,
-              tableConfig, instanceZKMetadata, this, 
_indexDir.getAbsolutePath(),
-              indexLoadingConfig, schema, _serverMetrics);
-      hlRealtimeSegmentDataManager.startConsumption();
-      segmentDataManager = hlRealtimeSegmentDataManager;
-    }
+    // Generates only one semaphore for every partitionGroupId
+    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    int partitionGroupId = llcSegmentName.getPartitionGroupId();
+    Semaphore semaphore = 
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new 
Semaphore(1));
+    PartitionUpsertMetadataManager partitionUpsertMetadataManager =
+        _tableUpsertMetadataManager != null ? 
_tableUpsertMetadataManager.getOrCreatePartitionManager(
+            partitionGroupId) : null;
+    PartitionDedupMetadataManager partitionDedupMetadataManager =
+        _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
+            : null;
+    LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
+        new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, 
_indexDir.getAbsolutePath(),
+            indexLoadingConfig, schema, llcSegmentName, semaphore, 
_serverMetrics, partitionUpsertMetadataManager,
+            partitionDedupMetadataManager, _isTableReadyToConsumeData);
+    llRealtimeSegmentDataManager.startConsumption();
+    segmentDataManager = llRealtimeSegmentDataManager;
 
     _logger.info("Initialized RealtimeSegmentDataManager - " + segmentName);
     registerSegment(segmentName, segmentDataManager);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index fbcc68f56c..008d0dd900 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.realtime.impl.fakestream;
 
-import java.util.Set;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -31,7 +30,6 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamDecoderProvider;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -50,12 +48,6 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
     return new FakePartitionLevelConsumer(partition, _streamConfig, 
FakeStreamConfigUtils.MESSAGE_BATCH_SIZE);
   }
 
-  @Override
-  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Set<String> fieldsToRead,
-      String groupId) {
-    throw new UnsupportedOperationException("Pinot no longer support stream 
level consumers!");
-  }
-
   @Override
   public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
     return new FakeStreamMetadataProvider(_streamConfig);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index e0d1015b65..9679a175dd 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import java.util.Set;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 
 
@@ -32,12 +30,6 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
     return new KafkaPartitionLevelConsumer(clientId, _streamConfig, partition);
   }
 
-  @Override
-  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Set<String> fieldsToRead,
-      String groupId) {
-    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
fieldsToRead, groupId);
-  }
-
   @Override
   public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
     return new KafkaStreamMetadataProvider(clientId, _streamConfig, partition);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
deleted file mode 100644
index 81df76f628..0000000000
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kafka20;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamDecoderProvider;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * An implementation of a {@link StreamLevelConsumer} which consumes from the 
kafka stream
- */
-// Pinot no longer support high level consumer model since v0.12.*
-@Deprecated
-public class KafkaStreamLevelConsumer implements StreamLevelConsumer {
-
-  private final StreamMessageDecoder _messageDecoder;
-  private final Logger _instanceLogger;
-
-  private final StreamConfig _streamConfig;
-  private final KafkaStreamLevelStreamConfig _kafkaStreamLevelStreamConfig;
-
-  private KafkaConsumer<Bytes, Bytes> _consumer;
-  private ConsumerRecords<Bytes, Bytes> _consumerRecords;
-  private Iterator<ConsumerRecord<Bytes, Bytes>> _kafkaIterator;
-  private Map<Integer, Long> _consumerOffsets = new HashMap<>(); // tracking 
current consumed records offsets.
-
-  private long _lastLogTime = 0;
-  private long _lastCount = 0;
-  private long _currentCount = 0L;
-
-  public KafkaStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig,
-      Set<String> sourceFields, String groupId) {
-    _streamConfig = streamConfig;
-    _kafkaStreamLevelStreamConfig = new 
KafkaStreamLevelStreamConfig(streamConfig, tableName, groupId);
-
-    _messageDecoder = StreamDecoderProvider.create(streamConfig, sourceFields);
-    _instanceLogger = LoggerFactory
-        .getLogger(KafkaStreamLevelConsumer.class.getName() + "_" + tableName 
+ "_" + streamConfig.getTopicName());
-    _instanceLogger.info("KafkaStreamLevelConsumer: streamConfig : {}", 
_streamConfig);
-  }
-
-  @Override
-  public void start()
-      throws Exception {
-    _consumer = 
KafkaStreamLevelConsumerManager.acquireKafkaConsumerForConfig(_kafkaStreamLevelStreamConfig);
-  }
-
-  private void updateKafkaIterator() {
-    _consumerRecords = 
_consumer.poll(Duration.ofMillis(_streamConfig.getFetchTimeoutMillis()));
-    _kafkaIterator = _consumerRecords.iterator();
-  }
-
-  private void resetOffsets() {
-    for (int partition : _consumerOffsets.keySet()) {
-      long offsetToSeek = _consumerOffsets.get(partition);
-      _consumer.seek(new TopicPartition(_streamConfig.getTopicName(), 
partition), offsetToSeek);
-    }
-  }
-
-  @Override
-  public GenericRow next(GenericRow destination) {
-    if (_kafkaIterator == null || !_kafkaIterator.hasNext()) {
-      updateKafkaIterator();
-    }
-    if (_kafkaIterator.hasNext()) {
-      try {
-        final ConsumerRecord<Bytes, Bytes> record = _kafkaIterator.next();
-        updateOffsets(record.partition(), record.offset());
-        destination = _messageDecoder.decode(record.value().get(), 
destination);
-
-        _currentCount++;
-
-        final long now = System.currentTimeMillis();
-        // Log every minute or 100k events
-        if (now - _lastLogTime > 60000 || _currentCount - _lastCount >= 
100000) {
-          if (_lastCount == 0) {
-            _instanceLogger
-                .info("Consumed {} events from kafka stream {}", 
_currentCount, _streamConfig.getTopicName());
-          } else {
-            _instanceLogger.info("Consumed {} events from kafka stream {} 
(rate:{}/s)", _currentCount - _lastCount,
-                _streamConfig.getTopicName(), (float) (_currentCount - 
_lastCount) * 1000 / (now - _lastLogTime));
-          }
-          _lastCount = _currentCount;
-          _lastLogTime = now;
-        }
-        return destination;
-      } catch (Exception e) {
-        _instanceLogger.warn("Caught exception while consuming events", e);
-        throw e;
-      }
-    }
-    return null;
-  }
-
-  private void updateOffsets(int partition, long offset) {
-    _consumerOffsets.put(partition, offset + 1);
-  }
-
-  @Override
-  public void commit() {
-    _consumer.commitSync(getOffsetsMap());
-    // Since the lastest batch may not be consumed fully, so we need to reset 
kafka consumer's offset.
-    resetOffsets();
-    _consumerOffsets.clear();
-  }
-
-  private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
-    Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
-    for (Integer partition : _consumerOffsets.keySet()) {
-      offsetsMap.put(new TopicPartition(_streamConfig.getTopicName(), 
partition),
-          new OffsetAndMetadata(_consumerOffsets.get(partition)));
-    }
-    return offsetsMap;
-  }
-
-  @Override
-  public void shutdown()
-      throws Exception {
-    if (_consumer != null) {
-      // If offsets commit is not succeed, then reset the offsets here.
-      resetOffsets();
-      KafkaStreamLevelConsumerManager.releaseKafkaConsumer(_consumer);
-      _consumer = null;
-    }
-  }
-}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
deleted file mode 100644
index c6fc50b231..0000000000
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kafka20;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Manager for Kafka consumers that reuses consumers and delays their shutdown.
- *
- * This is a workaround for the current realtime design flaw where any issue 
while flushing/committing offsets causes
- * duplicate or dropped events. Kafka consumption is driven by the controller, 
which assigns a realtime segment to the
- * servers; when a server is assigned a new realtime segment, it creates a 
Kafka consumer, consumes until it reaches a
- * threshold then flushes to disk, writes metadata to helix indicating the 
segment is completed, commits Kafka offsets
- * to ZK and then shuts down the consumer. The controller notices the metadata 
write and reassigns a segment to the
- * server, so that it can keep on consuming.
- *
- * This logic is flawed if committing Kafka offsets fails, at which time the 
committed state is unknown. The proper fix
- * would be to just keep on using that consumer and try committing our offsets 
later, but we recreate a new Kafka
- * consumer whenever we get a new segment and also keep the old consumer 
around, leading to half the events being
- * assigned, due to Kafka rebalancing the partitions between the two consumers 
(one of which is not actually reading
- * anything anymore). Because that logic is stateless and driven by Helix, 
there's no real clean way to keep the
- * consumer alive and pass it to the next segment.
- *
- * This class and long comment is to work around this issue by keeping the 
consumer alive for a little bit instead of
- * shutting it down immediately, so that the next segment assignment can pick 
up the same consumer. This way, even if
- * committing the offsets fails, we can still pick up the same consumer the 
next time we get a segment assigned to us
- * by the controller and hopefully commit our offsets the next time we flush 
to disk.
- *
- * This temporary code should be completely removed by the time we redesign 
the consumption to use the lower level
- * Kafka APIs.
- */
-public class KafkaStreamLevelConsumerManager {
-  private KafkaStreamLevelConsumerManager() {
-  }
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaStreamLevelConsumerManager.class);
-  private static final Long IN_USE = -1L;
-  private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = 
TimeUnit.SECONDS.toMillis(60); // One minute
-  private static final Map<ImmutableTriple<String, String, String>, 
KafkaConsumer> CONSUMER_FOR_CONFIG_KEY =
-      new HashMap<>();
-  private static final IdentityHashMap<KafkaConsumer, Long> 
CONSUMER_RELEASE_TIME = new IdentityHashMap<>();
-
-  public static KafkaConsumer 
acquireKafkaConsumerForConfig(KafkaStreamLevelStreamConfig 
kafkaStreamLevelStreamConfig) {
-    final ImmutableTriple<String, String, String> configKey =
-        new ImmutableTriple<>(kafkaStreamLevelStreamConfig.getKafkaTopicName(),
-            kafkaStreamLevelStreamConfig.getGroupId(),
-            kafkaStreamLevelStreamConfig.getBootstrapServers());
-
-    synchronized (KafkaStreamLevelConsumerManager.class) {
-      // If we have the consumer and it's not already acquired, return it, 
otherwise error out if it's already acquired
-      if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) {
-        KafkaConsumer kafkaConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey);
-        if (CONSUMER_RELEASE_TIME.get(kafkaConsumer).equals(IN_USE)) {
-          throw new RuntimeException("Consumer " + kafkaConsumer + " already 
in use!");
-        } else {
-          LOGGER.info("Reusing kafka consumer with id {}", kafkaConsumer);
-          CONSUMER_RELEASE_TIME.put(kafkaConsumer, IN_USE);
-          return kafkaConsumer;
-        }
-      }
-
-      LOGGER.info("Creating new kafka consumer and iterator for topic {}",
-          kafkaStreamLevelStreamConfig.getKafkaTopicName());
-
-      // Create the consumer
-
-      Properties consumerProp = new Properties();
-      
consumerProp.putAll(kafkaStreamLevelStreamConfig.getKafkaConsumerProperties());
-      consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaStreamLevelStreamConfig.getBootstrapServers());
-      consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-      consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
-
-      if (consumerProp.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) && 
consumerProp
-          
.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals("smallest")) {
-        consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-      }
-      KafkaConsumer consumer = new KafkaConsumer<>(consumerProp);
-      
consumer.subscribe(Collections.singletonList(kafkaStreamLevelStreamConfig.getKafkaTopicName()));
-
-      // Mark both the consumer and iterator as acquired
-      CONSUMER_FOR_CONFIG_KEY.put(configKey, consumer);
-      CONSUMER_RELEASE_TIME.put(consumer, IN_USE);
-
-      LOGGER
-          .info("Created consumer with id {} for topic {}", consumer, 
kafkaStreamLevelStreamConfig.getKafkaTopicName());
-
-      return consumer;
-    }
-  }
-
-  public static void releaseKafkaConsumer(final KafkaConsumer kafkaConsumer) {
-    synchronized (KafkaStreamLevelConsumerManager.class) {
-      // Release the consumer, mark it for shutdown in the future
-      final long releaseTime = System.currentTimeMillis() + 
CONSUMER_SHUTDOWN_DELAY_MILLIS;
-      CONSUMER_RELEASE_TIME.put(kafkaConsumer, releaseTime);
-
-      LOGGER.info("Marking consumer with id {} for release at {}", 
kafkaConsumer, releaseTime);
-
-      // Schedule the shutdown of the consumer
-      new Thread() {
-        @Override
-        public void run() {
-          try {
-            // Await the shutdown time
-            
Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS, 
TimeUnit.MILLISECONDS);
-
-            // Shutdown all consumers that have not been re-acquired
-            synchronized (KafkaStreamLevelConsumerManager.class) {
-              LOGGER.info("Executing release check for consumer {} at {}, 
scheduled at {}", kafkaConsumer,
-                  System.currentTimeMillis(), releaseTime);
-
-              Iterator<Map.Entry<ImmutableTriple<String, String, String>, 
KafkaConsumer>> configIterator =
-                  CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
-
-              while (configIterator.hasNext()) {
-                Map.Entry<ImmutableTriple<String, String, String>, 
KafkaConsumer> entry = configIterator.next();
-                KafkaConsumer kafkaConsumer = entry.getValue();
-
-                final Long releaseTime = 
CONSUMER_RELEASE_TIME.get(kafkaConsumer);
-                if (!releaseTime.equals(IN_USE) && releaseTime < 
System.currentTimeMillis()) {
-                  LOGGER.info("Releasing consumer {}", kafkaConsumer);
-
-                  try {
-                    kafkaConsumer.close();
-                  } catch (Exception e) {
-                    LOGGER.warn("Caught exception while shutting down Kafka 
consumer with id {}", kafkaConsumer, e);
-                  }
-
-                  configIterator.remove();
-                  CONSUMER_RELEASE_TIME.remove(kafkaConsumer);
-                } else {
-                  LOGGER.info("Not releasing consumer {}, it has been 
reacquired", kafkaConsumer);
-                }
-              }
-            }
-          } catch (Exception e) {
-            LOGGER.warn("Caught exception in release of consumer {}", 
kafkaConsumer, e);
-          }
-        }
-      }.start();
-    }
-  }
-
-  public static void closeAllConsumers() {
-    try {
-      // Shutdown all consumers
-      synchronized (KafkaStreamLevelConsumerManager.class) {
-        LOGGER.info("Trying to shutdown all the kafka consumers");
-        Iterator<KafkaConsumer> consumerIterator = 
CONSUMER_FOR_CONFIG_KEY.values().iterator();
-
-        while (consumerIterator.hasNext()) {
-          KafkaConsumer kafkaConsumer = consumerIterator.next();
-          LOGGER.info("Trying to shutdown consumer {}", kafkaConsumer);
-          try {
-            kafkaConsumer.close();
-          } catch (Exception e) {
-            LOGGER.warn("Caught exception while shutting down Kafka consumer 
with id {}", kafkaConsumer, e);
-          }
-          consumerIterator.remove();
-        }
-        CONSUMER_FOR_CONFIG_KEY.clear();
-        CONSUMER_RELEASE_TIME.clear();
-      }
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception during shutting down all kafka consumers", 
e);
-    }
-  }
-}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelStreamConfig.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelStreamConfig.java
deleted file mode 100644
index 536121d51a..0000000000
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelStreamConfig.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kafka20;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
-import org.apache.pinot.spi.utils.EqualityUtils;
-
-
-/**
- * Wrapper around {@link StreamConfig} for use in the {@link 
KafkaStreamLevelConsumer}
- */
-public class KafkaStreamLevelStreamConfig {
-  private static final String DEFAULT_AUTO_COMMIT_ENABLE = "false";
-  private static final Map<String, String> DEFAULT_PROPS = new HashMap<String, 
String>() {{
-    put(KafkaStreamConfigProperties.HighLevelConsumer.AUTO_COMMIT_ENABLE, 
DEFAULT_AUTO_COMMIT_ENABLE);
-  }};
-
-  private String _kafkaTopicName;
-  private String _groupId;
-  private String _bootstrapServers;
-  private Map<String, String> _kafkaConsumerProperties;
-
-  /**
-   * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level 
consumer specific configs
-   * @param streamConfig
-   * @param tableName
-   * @param groupId
-   */
-  public KafkaStreamLevelStreamConfig(StreamConfig streamConfig, String 
tableName, String groupId) {
-    Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
-
-    _kafkaTopicName = streamConfig.getTopicName();
-    String hlcBootstrapBrokerUrlKey =
-        KafkaStreamConfigProperties
-            
.constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER);
-    _bootstrapServers = streamConfigMap.get(hlcBootstrapBrokerUrlKey);
-    Preconditions.checkNotNull(_bootstrapServers,
-        "Must specify bootstrap broker connect string " + 
hlcBootstrapBrokerUrlKey + " in high level kafka consumer");
-    _groupId = groupId;
-
-    _kafkaConsumerProperties = new HashMap<>();
-    String kafkaConsumerPropertyPrefix =
-        
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.KAFKA_CONSUMER_PROP_PREFIX);
-    for (String key : streamConfigMap.keySet()) {
-      if (key.startsWith(kafkaConsumerPropertyPrefix)) {
-        _kafkaConsumerProperties
-            .put(StreamConfigProperties.getPropertySuffix(key, 
kafkaConsumerPropertyPrefix), streamConfigMap.get(key));
-      }
-    }
-  }
-
-  public String getKafkaTopicName() {
-    return _kafkaTopicName;
-  }
-
-  public String getGroupId() {
-    return _groupId;
-  }
-
-  public Properties getKafkaConsumerProperties() {
-    Properties props = new Properties();
-    for (String key : DEFAULT_PROPS.keySet()) {
-      props.put(key, DEFAULT_PROPS.get(key));
-    }
-    for (String key : _kafkaConsumerProperties.keySet()) {
-      props.put(key, _kafkaConsumerProperties.get(key));
-    }
-    props.put("group.id", _groupId);
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers);
-    return props;
-  }
-
-  @Override
-  public String toString() {
-    return "KafkaStreamLevelStreamConfig{" + "_kafkaTopicName='" + 
_kafkaTopicName + '\'' + ", _groupId='" + _groupId
-        + '\'' + ", _bootstrapServers='"
-        + _bootstrapServers + '\'' + ", _kafkaConsumerProperties=" + 
_kafkaConsumerProperties + '}';
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (EqualityUtils.isSameReference(this, o)) {
-      return true;
-    }
-
-    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
-      return false;
-    }
-
-    KafkaStreamLevelStreamConfig that = (KafkaStreamLevelStreamConfig) o;
-
-    return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && 
EqualityUtils
-        .isEqual(_groupId, that._groupId) && EqualityUtils
-        .isEqual(_bootstrapServers, that._bootstrapServers) && EqualityUtils
-        .isEqual(_kafkaConsumerProperties, that._kafkaConsumerProperties);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
-    result = EqualityUtils.hashCodeOf(result, _groupId);
-    result = EqualityUtils.hashCodeOf(result, _bootstrapServers);
-    result = EqualityUtils.hashCodeOf(result, _kafkaConsumerProperties);
-    return result;
-  }
-
-  public String getBootstrapServers() {
-    return _bootstrapServers;
-  }
-}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index fc7f99cf06..6f81632c8c 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -18,12 +18,10 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.util.Set;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 
@@ -37,13 +35,6 @@ public class KinesisConsumerFactory extends 
StreamConsumerFactory {
   public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, 
int partition) {
     throw new UnsupportedOperationException();
   }
-
-  @Override
-  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Set<String> fieldsToRead,
-      String groupId) {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
     return new KinesisStreamMetadataProvider(clientId, _streamConfig);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
index 9e1e68c199..6614bd321d 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java
@@ -18,12 +18,10 @@
  */
 package org.apache.pinot.plugin.stream.pulsar;
 
-import java.util.Set;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 
@@ -37,12 +35,6 @@ public class PulsarConsumerFactory extends 
StreamConsumerFactory {
     throw new UnsupportedOperationException("Partition Level consumer is 
deprecated!");
   }
 
-  @Override
-  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Set<String> fieldsToRead,
-      String groupId) {
-    return new PulsarStreamLevelConsumer(clientId, tableName, _streamConfig, 
fieldsToRead, groupId);
-  }
-
   @Override
   public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
     return new PulsarStreamMetadataProvider(clientId, _streamConfig, 
partition);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
deleted file mode 100644
index 78835f492c..0000000000
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.pulsar;
-
-import java.util.Set;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamDecoderProvider;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Reader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A {@link StreamLevelConsumer} implementation for the Pulsar stream
- */
-// Pinot no longer support high level consumer model since v0.12.*
-@Deprecated
-public class PulsarStreamLevelConsumer implements StreamLevelConsumer {
-  private Logger _logger;
-
-  private StreamMessageDecoder _messageDecoder;
-
-  private StreamConfig _streamConfig;
-  private PulsarConfig _pulsarStreamLevelStreamConfig;
-
-  private Reader<byte[]> _reader;
-
-  private long _lastLogTime = 0;
-  private long _lastCount = 0;
-  private long _currentCount = 0L;
-
-  public PulsarStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig,
-      Set<String> sourceFields, String subscriberId) {
-    _streamConfig = streamConfig;
-    _pulsarStreamLevelStreamConfig = new PulsarConfig(streamConfig, 
subscriberId);
-
-    _messageDecoder = StreamDecoderProvider.create(streamConfig, sourceFields);
-
-    _logger =
-        LoggerFactory.getLogger(PulsarConfig.class.getName() + "_" + tableName 
+ "_" + streamConfig.getTopicName());
-    _logger.info("PulsarStreamLevelConsumer: streamConfig : {}", 
_streamConfig);
-  }
-
-  @Override
-  public void start()
-      throws Exception {
-    _reader = 
PulsarStreamLevelConsumerManager.acquirePulsarConsumerForConfig(_pulsarStreamLevelStreamConfig);
-  }
-
-  /**
-   * Get next {@link GenericRow} after decoding pulsar {@link Message}
-   */
-  @Override
-  public GenericRow next(GenericRow destination) {
-    try {
-      if (_reader.hasMessageAvailable()) {
-        final Message<byte[]> record = _reader.readNext();
-        destination = _messageDecoder.decode(record.getData(), destination);
-
-        _currentCount++;
-
-        final long now = System.currentTimeMillis();
-        // Log every minute or 100k events
-        if (now - _lastLogTime > 60000 || _currentCount - _lastCount >= 
100000) {
-          if (_lastCount == 0) {
-            _logger.info("Consumed {} events from pulsar stream {}", 
_currentCount, _streamConfig.getTopicName());
-          } else {
-            _logger.info("Consumed {} events from pulsar stream {} 
(rate:{}/s)", _currentCount - _lastCount,
-                _streamConfig.getTopicName(), (float) (_currentCount - 
_lastCount) * 1000 / (now - _lastLogTime));
-          }
-          _lastCount = _currentCount;
-          _lastLogTime = now;
-        }
-        return destination;
-      }
-    } catch (Exception e) {
-      _logger.warn("Caught exception while consuming events", e);
-    }
-    return null;
-  }
-
-  @Override
-  public void commit() {
-  }
-
-  @Override
-  public void shutdown()
-      throws Exception {
-    if (_reader != null) {
-      PulsarStreamLevelConsumerManager.releasePulsarConsumer(_reader);
-      _reader = null;
-    }
-  }
-}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
deleted file mode 100644
index ff23a0cea6..0000000000
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.pulsar;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Implements pulsar high level connection manager.
- */
-public class PulsarStreamLevelConsumerManager {
-  private PulsarStreamLevelConsumerManager() {
-  }
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarStreamLevelConsumerManager.class);
-  private static final Long IN_USE = -1L;
-  private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = 
TimeUnit.SECONDS.toMillis(60); // One minute
-  private static final Map<ImmutableTriple<String, String, String>, 
Reader<byte[]>> CONSUMER_FOR_CONFIG_KEY =
-      new HashMap<>();
-  private static final IdentityHashMap<Reader<byte[]>, Long> 
CONSUMER_RELEASE_TIME = new IdentityHashMap<>();
-  protected static PulsarClient _pulsarClient;
-  protected static Reader<byte[]> _reader;
-
-  /**
-   * Get {@link Reader} for {@link PulsarConfig}. If the reader is already 
created we return the instance, otherwise
-   * a new reader is created.
-   */
-  public static Reader<byte[]> acquirePulsarConsumerForConfig(PulsarConfig 
pulsarStreamLevelStreamConfig) {
-    final ImmutableTriple<String, String, String> configKey =
-        new 
ImmutableTriple<>(pulsarStreamLevelStreamConfig.getPulsarTopicName(),
-            pulsarStreamLevelStreamConfig.getSubscriberId(), 
pulsarStreamLevelStreamConfig.getBootstrapServers());
-
-    synchronized (PulsarStreamLevelConsumerManager.class) {
-      // If we have the consumer and it's not already acquired, return it, 
otherwise error out if it's already acquired
-      if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) {
-        Reader<byte[]> pulsarConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey);
-        if (CONSUMER_RELEASE_TIME.get(pulsarConsumer).equals(IN_USE)) {
-          throw new RuntimeException("Consumer " + pulsarConsumer + " already 
in use!");
-        } else {
-          LOGGER.info("Reusing pulsar consumer with id {}", pulsarConsumer);
-          CONSUMER_RELEASE_TIME.put(pulsarConsumer, IN_USE);
-          return pulsarConsumer;
-        }
-      }
-
-      LOGGER.info("Creating new pulsar consumer and iterator for topic {}",
-          pulsarStreamLevelStreamConfig.getPulsarTopicName());
-
-      // Create the consumer
-      try {
-        ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(
-                pulsarStreamLevelStreamConfig.getBootstrapServers());
-        if (pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath() != null) {
-          
pulsarClientBuilder.tlsTrustCertsFilePath(pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath());
-        }
-
-        if (pulsarStreamLevelStreamConfig.getAuthenticationToken() != null) {
-          Authentication authentication = AuthenticationFactory.token(
-                  pulsarStreamLevelStreamConfig.getAuthenticationToken());
-          pulsarClientBuilder.authentication(authentication);
-        }
-
-        _pulsarClient = pulsarClientBuilder.build();
-
-        _reader = 
_pulsarClient.newReader().topic(pulsarStreamLevelStreamConfig.getPulsarTopicName())
-            
.startMessageId(pulsarStreamLevelStreamConfig.getInitialMessageId()).create();
-
-        // Mark both the consumer and iterator as acquired
-        CONSUMER_FOR_CONFIG_KEY.put(configKey, _reader);
-        CONSUMER_RELEASE_TIME.put(_reader, IN_USE);
-
-        LOGGER.info("Created consumer with id {} for topic {}", _reader,
-            pulsarStreamLevelStreamConfig.getPulsarTopicName());
-
-        return _reader;
-      } catch (PulsarClientException e) {
-        LOGGER.error("Could not create pulsar consumer", e);
-        return null;
-      }
-    }
-  }
-
-  /**
-   *  remove the {@link Reader} from consumer pool after closing it.
-   */
-  public static void releasePulsarConsumer(final Reader<byte[]> 
pulsarConsumer) {
-    synchronized (PulsarStreamLevelConsumerManager.class) {
-      // Release the consumer, mark it for shutdown in the future
-      final long releaseTime = System.currentTimeMillis() + 
CONSUMER_SHUTDOWN_DELAY_MILLIS;
-      CONSUMER_RELEASE_TIME.put(pulsarConsumer, releaseTime);
-
-      LOGGER.info("Marking consumer with id {} for release at {}", 
pulsarConsumer, releaseTime);
-
-      // Schedule the shutdown of the consumer
-      new Thread() {
-        @Override
-        public void run() {
-          try {
-            // Await the shutdown time
-            
Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS, 
TimeUnit.MILLISECONDS);
-
-            // Shutdown all consumers that have not been re-acquired
-            synchronized (PulsarStreamLevelConsumerManager.class) {
-              LOGGER.info("Executing release check for consumer {} at {}, 
scheduled at {}", pulsarConsumer,
-                  System.currentTimeMillis(), releaseTime);
-
-              Iterator<Map.Entry<ImmutableTriple<String, String, String>, 
Reader<byte[]>>> configIterator =
-                  CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
-
-              while (configIterator.hasNext()) {
-                Map.Entry<ImmutableTriple<String, String, String>, 
Reader<byte[]>> entry = configIterator.next();
-                Reader<byte[]> pulsarConsumer = entry.getValue();
-
-                final Long releaseTime = 
CONSUMER_RELEASE_TIME.get(pulsarConsumer);
-                if (!releaseTime.equals(IN_USE) && releaseTime < 
System.currentTimeMillis()) {
-                  LOGGER.info("Releasing consumer {}", pulsarConsumer);
-
-                  try {
-                    pulsarConsumer.close();
-                  } catch (Exception e) {
-                    LOGGER.warn("Caught exception while shutting down Pulsar 
consumer with id {}", pulsarConsumer, e);
-                  }
-
-                  configIterator.remove();
-                  CONSUMER_RELEASE_TIME.remove(pulsarConsumer);
-                } else {
-                  LOGGER.info("Not releasing consumer {}, it has been 
reacquired", pulsarConsumer);
-                }
-              }
-            }
-          } catch (Exception e) {
-            LOGGER.warn("Caught exception in release of consumer {}", 
pulsarConsumer, e);
-          }
-        }
-      }.start();
-    }
-  }
-
-  public static void closeAllConsumers() {
-    try {
-      // Shutdown all consumers
-      synchronized (PulsarStreamLevelConsumerManager.class) {
-        LOGGER.info("Trying to shutdown all the pulsar consumers");
-        Iterator<Reader<byte[]>> consumerIterator = 
CONSUMER_FOR_CONFIG_KEY.values().iterator();
-
-        while (consumerIterator.hasNext()) {
-          Reader<byte[]> pulsarConsumer = consumerIterator.next();
-          LOGGER.info("Trying to shutdown consumer {}", pulsarConsumer);
-          try {
-            pulsarConsumer.close();
-          } catch (Exception e) {
-            LOGGER.warn("Caught exception while shutting down Pulsar consumer 
with id {}", pulsarConsumer, e);
-          }
-          consumerIterator.remove();
-        }
-        CONSUMER_FOR_CONFIG_KEY.clear();
-        CONSUMER_RELEASE_TIME.clear();
-      }
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception during shutting down all pulsar 
consumers", e);
-    }
-  }
-}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 4b4d671605..60e04a2431 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -51,8 +51,12 @@ public abstract class StreamConsumerFactory {
    * @param groupId consumer group Id
    * @return the stream level consumer
    */
-  public abstract StreamLevelConsumer createStreamLevelConsumer(String 
clientId, String tableName,
-      Set<String> fieldsToRead, String groupId);
+  @Deprecated
+  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName,
+      Set<String> fieldsToRead, String groupId) {
+    throw new UnsupportedOperationException("Apache pinot no longer supports 
stream level consumer model. Please "
+        + "create partition level consumer only");
+  }
 
   /**
    * Creates a metadata provider which provides partition specific metadata


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to