This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 734e7d7be95 Always use Schema from IndexLoadingConfig when loading 
segments (#16267)
734e7d7be95 is described below

commit 734e7d7be9563d63129830a91b81753f528d8ee5
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Jul 3 00:23:40 2025 -0600

    Always use Schema from IndexLoadingConfig when loading segments (#16267)
---
 .../core/data/manager/TextIndexCreationTest.java   |   2 +-
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  29 ------
 ...dexDisabledMultiValueQueriesWithReloadTest.java |   4 +-
 ...ForwardIndexDisabledSingleValueQueriesTest.java |   4 +-
 .../ForwardIndexHandlerReloadQueriesTest.java      |   2 +-
 .../org/apache/pinot/queries/RangeQueriesTest.java |  26 ++---
 .../immutable/ImmutableSegmentLoader.java          | 114 +++++----------------
 .../impl/SegmentIndexCreationDriverImpl.java       |  30 +++---
 .../local/segment/index/bloom/BloomIndexType.java  |   4 +-
 .../index/dictionary/DictionaryIndexType.java      |   2 +-
 .../segment/index/forward/ForwardIndexType.java    |   4 +-
 .../local/segment/index/fst/FstIndexType.java      |   4 +-
 .../local/segment/index/h3/H3IndexType.java        |   4 +-
 .../segment/index/inverted/InvertedIndexType.java  |   4 +-
 .../local/segment/index/json/JsonIndexType.java    |   4 +-
 .../segment/index/loader/BaseIndexHandler.java     |  12 ++-
 .../segment/index/loader/ForwardIndexHandler.java  |  40 +++-----
 .../segment/index/loader/SegmentPreProcessor.java  |  58 +++++------
 .../loader/bloomfilter/BloomFilterHandler.java     |   6 +-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  23 +++--
 .../defaultcolumn/DefaultColumnHandlerFactory.java |   7 +-
 .../defaultcolumn/V1DefaultColumnHandler.java      |   5 +-
 .../defaultcolumn/V3DefaultColumnHandler.java      |   5 +-
 .../loader/invertedindex/FSTIndexHandler.java      |   6 +-
 .../index/loader/invertedindex/H3IndexHandler.java |   6 +-
 .../loader/invertedindex/InvertedIndexHandler.java |   6 +-
 .../loader/invertedindex/JsonIndexHandler.java     |   6 +-
 .../invertedindex/MultiColumnTextIndexHandler.java |  13 +--
 .../loader/invertedindex/RangeIndexHandler.java    |   9 +-
 .../loader/invertedindex/TextIndexHandler.java     |  16 ++-
 .../loader/invertedindex/VectorIndexHandler.java   |   6 +-
 .../index/nullvalue/NullValueIndexType.java        |   2 +-
 .../local/segment/index/range/RangeIndexType.java  |   4 +-
 .../local/segment/index/text/TextIndexType.java    |   4 +-
 .../segment/index/vector/VectorIndexType.java      |   4 +-
 .../segment/creator/BitmapInvertedIndexTest.java   |  12 ++-
 .../SegmentV1V2ToV3FormatConverterTest.java        |   2 +-
 .../index/loader/ForwardIndexHandlerTest.java      |   2 +-
 .../local/segment/index/loader/LoaderTest.java     |  51 +++++----
 .../index/loader/SegmentPreProcessorTest.java      |  68 ++++++------
 .../defaultcolumn/DefaultColumnHandlerTest.java    |   2 +-
 .../apache/pinot/segment/spi/index/IndexType.java  |   3 +-
 .../pinot/segment/spi/index/IndexServiceTest.java  |   4 +-
 43 files changed, 263 insertions(+), 356 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/TextIndexCreationTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/TextIndexCreationTest.java
index c31a01b167e..04fc74dbc13 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/TextIndexCreationTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/TextIndexCreationTest.java
@@ -100,7 +100,7 @@ public class TextIndexCreationTest {
     IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
     ImmutableSegment segment = null;
     try {
-      segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
null);
+      segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig);
     } finally {
       if (segment != null) {
         segment.destroy();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 5b1e07eb73d..5bc6636d15e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.queries;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -45,22 +44,14 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.core.util.GapfillUtils;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
-import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
-import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.intellij.lang.annotations.Language;
@@ -280,26 +271,6 @@ public abstract class BaseQueriesTest {
     return getBrokerResponse(pinotQuery, PLAN_MAKER);
   }
 
-  /**
-   * Helper function to call reloadSegment on an existing index directory. The 
segment is preprocessed using the
-   * config provided in indexLoadingConfig. It returns an immutable segment.
-   */
-  protected ImmutableSegment reloadSegment(File indexDir, IndexLoadingConfig 
indexLoadingConfig, Schema schema)
-      throws Exception {
-    Map<String, Object> props = new HashMap<>();
-    props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString());
-    PinotConfiguration configuration = new PinotConfiguration(props);
-
-    try (SegmentDirectory segmentDirectory = 
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
-        .load(indexDir.toURI(),
-            new 
SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(configuration).build());
-        SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema)) {
-      processor.process();
-    }
-    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
-    return immutableSegment;
-  }
-
   /**
    * Run query on multiple index segments with custom plan maker.
    * This test is particularly useful for testing statistical aggregation 
functions such as COVAR_POP, COVAR_SAMP, etc.
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesWithReloadTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesWithReloadTest.java
index 0a8efc9f1a7..2dd875e26a1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesWithReloadTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesWithReloadTest.java
@@ -715,7 +715,7 @@ public class 
ForwardIndexDisabledMultiValueQueriesWithReloadTest extends BaseQue
 
     // Reload the segments to pick up the new configs
     File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
-    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
     _indexSegment.destroy();
     _indexSegment = segment;
     _indexSegments = List.of(segment, segment);
@@ -736,7 +736,7 @@ public class 
ForwardIndexDisabledMultiValueQueriesWithReloadTest extends BaseQue
 
     // Reload the segments to pick up the new configs
     File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
-    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
     _indexSegment.destroy();
     _indexSegment = segment;
     _indexSegments = List.of(segment, segment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
index a09f1dd8639..f7b4fcbecd1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
@@ -1889,7 +1889,7 @@ public class ForwardIndexDisabledSingleValueQueriesTest 
extends BaseQueriesTest
 
     // Reload the segments to pick up the new configs
     File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
-    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
     _indexSegment.destroy();
     _indexSegment = segment;
     _indexSegments = List.of(segment, segment);
@@ -1914,7 +1914,7 @@ public class ForwardIndexDisabledSingleValueQueriesTest 
extends BaseQueriesTest
 
     // Reload the segments to pick up the new configs
     File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
-    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
     _indexSegment.destroy();
     _indexSegment = segment;
     _indexSegments = List.of(segment, segment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
index c19b20f0310..fc5b40d7c68 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
@@ -663,7 +663,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
 
     // Reload the segments to pick up the new configs
     File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
-    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
     _indexSegment.destroy();
     _indexSegment = segment;
     _indexSegments = List.of(segment, segment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/RangeQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/RangeQueriesTest.java
index 16102fdbae8..66108344036 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/RangeQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/RangeQueriesTest.java
@@ -319,9 +319,9 @@ public class RangeQueriesTest extends BaseQueriesTest {
     IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
 
     File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
-    ImmutableSegment immutableSegment = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
+    _indexSegment = segment;
+    _indexSegments = Arrays.asList(segment, segment);
 
     Operator<?> operator = getOperator(query);
     assertTrue(operator instanceof SelectionOnlyOperator);
@@ -335,9 +335,10 @@ public class RangeQueriesTest extends BaseQueriesTest {
     // Enable dictionary on RAW_DOUBLE_COL and reload the segment.
     indexLoadingConfig = createIndexLoadingConfig();
     indexDir = new File(INDEX_DIR, SEGMENT_NAME);
-    immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+    segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig);
+    _indexSegment.destroy();
+    _indexSegment = segment;
+    _indexSegments = Arrays.asList(segment, segment);
 
     operator = getOperator(query);
     assertTrue(operator instanceof SelectionOnlyOperator);
@@ -366,9 +367,9 @@ public class RangeQueriesTest extends BaseQueriesTest {
 
     IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
     File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
-    ImmutableSegment immutableSegment = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
+    _indexSegment = segment;
+    _indexSegments = Arrays.asList(segment, segment);
 
     Operator<?> operator = getOperator(query);
     assertTrue(operator instanceof FastFilteredCountOperator);
@@ -380,9 +381,10 @@ public class RangeQueriesTest extends BaseQueriesTest {
     // Enable dictionary on RAW_FLOAT_COL and reload the segment.
     _noDictionaryColumns.remove(RAW_FLOAT_COL);
     indexLoadingConfig = createIndexLoadingConfig();
-    immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+    segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig);
+    _indexSegment.destroy();
+    _indexSegment = segment;
+    _indexSegments = Arrays.asList(segment, segment);
 
     operator = getOperator(query);
     assertTrue(operator instanceof FastFilteredCountOperator);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index 7cb3d0a423f..2172e5f014e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -46,7 +46,6 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -55,7 +54,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-// TODO: Clean up this class to use the schema from the IndexLoadingConfig
 public class ImmutableSegmentLoader {
   private ImmutableSegmentLoader() {
   }
@@ -70,31 +68,7 @@ public class ImmutableSegmentLoader {
       throws Exception {
     IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig();
     defaultIndexLoadingConfig.setReadMode(readMode);
-    return load(indexDir, defaultIndexLoadingConfig, false);
-  }
-
-  /**
-   * Loads the segment with specified table config and schema.
-   * This method is used to access the segment without modifying it, i.e. in 
read-only mode.
-   */
-  @Deprecated
-  public static ImmutableSegment load(File indexDir, ReadMode readMode, 
TableConfig tableConfig,
-      @Nullable Schema schema)
-      throws Exception {
-    IndexLoadingConfig defaultIndexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
-    defaultIndexLoadingConfig.setReadMode(readMode);
-    return load(indexDir, defaultIndexLoadingConfig, false);
-  }
-
-  /**
-   * Loads the segment with specified IndexLoadingConfig.
-   * This method modifies the segment like to convert segment format, add or 
remove indices.
-   * Mostly used by UT cases to add some specific index for testing purpose.
-   */
-  public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
-      throws Exception {
-    return load(indexDir, indexLoadingConfig, true, 
segmentOperationsThrottler);
+    return load(indexDir, defaultIndexLoadingConfig, false, null);
   }
 
   /**
@@ -110,12 +84,12 @@ public class ImmutableSegmentLoader {
   /**
    * Loads the segment with specified IndexLoadingConfig.
    * This method modifies the segment like to convert segment format, add or 
remove indices.
+   * Mostly used by UT cases to add some specific index for testing purpose.
    */
-  public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, boolean needPreprocess,
+  public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
       throws Exception {
-    return load(indexDir, indexLoadingConfig, indexLoadingConfig.getSchema(), 
needPreprocess,
-        segmentOperationsThrottler);
+    return load(indexDir, indexLoadingConfig, true, 
segmentOperationsThrottler);
   }
 
   /**
@@ -124,37 +98,15 @@ public class ImmutableSegmentLoader {
    */
   public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, boolean needPreprocess)
       throws Exception {
-    return load(indexDir, indexLoadingConfig, indexLoadingConfig.getSchema(), 
needPreprocess,
-        null);
-  }
-
-  /**
-   * Loads the segment with specified schema and IndexLoadingConfig, usually 
from Zookeeper.
-   * This method modifies the segment like to convert segment format, add or 
remove indices.
-   * Mostly used by UT cases to add some specific index for testing purpose.
-   */
-  public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, @Nullable Schema schema,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
-      throws Exception {
-    return load(indexDir, indexLoadingConfig, schema, true, 
segmentOperationsThrottler);
-  }
-
-  /**
-   * Loads the segment with specified schema and IndexLoadingConfig, and 
allows to control whether to
-   * modify the segment like to convert segment format, add or remove indices.
-   */
-  public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, @Nullable Schema schema,
-      boolean needPreprocess)
-      throws Exception {
-    return load(indexDir, indexLoadingConfig, schema, needPreprocess, null);
+    return load(indexDir, indexLoadingConfig, needPreprocess, null);
   }
 
   /**
    * Loads the segment with specified schema and IndexLoadingConfig, and 
allows to control whether to
    * modify the segment like to convert segment format, add or remove indices.
    */
-  public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, @Nullable Schema schema,
-      boolean needPreprocess, @Nullable SegmentOperationsThrottler 
segmentOperationsThrottler)
+  public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, boolean needPreprocess,
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
       throws Exception {
     Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s 
does not exist or is not a directory",
         indexDir);
@@ -164,13 +116,12 @@ public class ImmutableSegmentLoader {
       return new EmptyIndexSegment(segmentMetadata);
     }
     if (needPreprocess) {
-      preprocess(indexDir, indexLoadingConfig, schema, 
segmentOperationsThrottler);
+      preprocess(indexDir, indexLoadingConfig, segmentOperationsThrottler);
     }
     String segmentName = segmentMetadata.getName();
     SegmentDirectoryLoaderContext segmentLoaderContext =
-        new SegmentDirectoryLoaderContext.Builder()
-            .setTableConfig(indexLoadingConfig.getTableConfig())
-            .setSchema(schema)
+        new 
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
+            .setSchema(indexLoadingConfig.getSchema())
             .setInstanceId(indexLoadingConfig.getInstanceId())
             .setTableDataDir(indexLoadingConfig.getTableDataDir())
             .setSegmentName(segmentName)
@@ -183,7 +134,7 @@ public class ImmutableSegmentLoader {
         
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
     SegmentDirectory segmentDirectory = segmentLoader.load(indexDir.toURI(), 
segmentLoaderContext);
     try {
-      return load(segmentDirectory, indexLoadingConfig, schema);
+      return load(segmentDirectory, indexLoadingConfig);
     } catch (Exception e) {
       LOGGER.error("Failed to load segment: {} with SegmentDirectory", 
segmentName, e);
       segmentDirectory.close();
@@ -191,19 +142,9 @@ public class ImmutableSegmentLoader {
     }
   }
 
-  /**
-   * Preprocess the local segment directory according to the current table 
config and schema.
-   */
   public static void preprocess(File indexDir, IndexLoadingConfig 
indexLoadingConfig,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
       throws Exception {
-    preprocess(indexDir, indexLoadingConfig, indexLoadingConfig.getSchema(), 
segmentOperationsThrottler);
-  }
-
-  @Deprecated
-  public static void preprocess(File indexDir, IndexLoadingConfig 
indexLoadingConfig, @Nullable Schema schema,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
-      throws Exception {
     Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s 
does not exist or is not a directory",
         indexDir);
 
@@ -214,8 +155,11 @@ public class ImmutableSegmentLoader {
       }
       try {
         convertSegmentFormat(indexDir, indexLoadingConfig, segmentMetadata);
-        preprocessSegment(indexDir, segmentMetadata.getName(), 
segmentMetadata.getCrc(), indexLoadingConfig, schema,
-            segmentOperationsThrottler);
+        // Preprocess requires table config and schema
+        if (indexLoadingConfig.getTableConfig() != null && 
indexLoadingConfig.getSchema() != null) {
+          preprocessSegment(indexDir, segmentMetadata.getName(), 
segmentMetadata.getCrc(), indexLoadingConfig,
+              segmentOperationsThrottler);
+        }
       } finally {
         if (segmentOperationsThrottler != null) {
           
segmentOperationsThrottler.getSegmentAllIndexPreprocessThrottler().release();
@@ -289,7 +233,7 @@ public class ImmutableSegmentLoader {
       mcTextReader = new MultiColumnLuceneTextIndexReader(segmentMetadata);
       for (String column : 
segmentMetadata.getMultiColumnTextMetadata().getColumns()) {
         ColumnIndexContainer container = indexContainerMap.get(column);
-        if (container != null && container instanceof 
PhysicalColumnIndexContainer) {
+        if (container instanceof PhysicalColumnIndexContainer) {
           ((PhysicalColumnIndexContainer) 
container).setMultiColumnTextIndex(mcTextReader);
         }
       }
@@ -308,21 +252,17 @@ public class ImmutableSegmentLoader {
    */
   public static boolean needPreprocess(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig)
       throws Exception {
-    return needPreprocess(segmentDirectory, indexLoadingConfig, 
indexLoadingConfig.getSchema());
-  }
-
-  @Deprecated
-  public static boolean needPreprocess(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig,
-      @Nullable Schema schema)
-      throws Exception {
     if (indexLoadingConfig.isSkipSegmentPreprocess()) {
       return false;
     }
     if (needConvertSegmentFormat(indexLoadingConfig, 
segmentDirectory.getSegmentMetadata())) {
       return true;
     }
-    SegmentPreProcessor preProcessor = new 
SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema);
-    return preProcessor.needProcess();
+    // Preprocess requires table config and schema
+    if (indexLoadingConfig.getTableConfig() == null || 
indexLoadingConfig.getSchema() == null) {
+      return false;
+    }
+    return new SegmentPreProcessor(segmentDirectory, 
indexLoadingConfig).needProcess();
   }
 
   private static boolean needConvertSegmentFormat(IndexLoadingConfig 
indexLoadingConfig,
@@ -355,14 +295,12 @@ public class ImmutableSegmentLoader {
   }
 
   private static void preprocessSegment(File indexDir, String segmentName, 
String segmentCrc,
-      IndexLoadingConfig indexLoadingConfig, @Nullable Schema schema,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
+      IndexLoadingConfig indexLoadingConfig, @Nullable 
SegmentOperationsThrottler segmentOperationsThrottler)
       throws Exception {
     PinotConfiguration segmentDirectoryConfigs = 
indexLoadingConfig.getSegmentDirectoryConfigs();
     SegmentDirectoryLoaderContext segmentLoaderContext =
-        new SegmentDirectoryLoaderContext.Builder()
-            .setTableConfig(indexLoadingConfig.getTableConfig())
-            .setSchema(schema)
+        new 
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
+            .setSchema(indexLoadingConfig.getSchema())
             .setInstanceId(indexLoadingConfig.getInstanceId())
             .setSegmentName(segmentName)
             .setSegmentCrc(segmentCrc)
@@ -370,7 +308,7 @@ public class ImmutableSegmentLoader {
             .build();
     SegmentDirectory segmentDirectory =
         
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(indexDir.toURI(),
 segmentLoaderContext);
-    try (SegmentPreProcessor preProcessor = new 
SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema)) {
+    try (SegmentPreProcessor preProcessor = new 
SegmentPreProcessor(segmentDirectory, indexLoadingConfig)) {
       preProcessor.process(segmentOperationsThrottler);
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 9d63426c972..f2a85926369 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -452,22 +452,23 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
       PinotConfiguration segmentDirectoryConfigs =
           new PinotConfiguration(Map.of(IndexLoadingConfig.READ_MODE_KEY, 
ReadMode.mmap));
 
+      TableConfig tableConfig = _config.getTableConfig();
+      Schema schema = _config.getSchema();
       SegmentDirectoryLoaderContext segmentLoaderContext =
           new SegmentDirectoryLoaderContext.Builder()
-              .setTableConfig(_config.getTableConfig())
-              .setSchema(_config.getSchema())
+              .setTableConfig(tableConfig)
+              .setSchema(schema)
               .setSegmentName(_segmentName)
               .setSegmentDirectoryConfigs(segmentDirectoryConfigs)
               .build();
 
+      IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
tableConfig, schema);
+
       try (SegmentDirectory segmentDirectory = 
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
           .load(segmentOutputDir.toURI(), segmentLoaderContext);
           SegmentDirectory.Writer segmentWriter = 
segmentDirectory.createWriter()) {
-        MultiColumnTextIndexHandler handler =
-            new MultiColumnTextIndexHandler(segmentDirectory,
-                _config.getIndexConfigsByColName(),
-                _config.getMultiColumnTextIndexConfig(),
-                _config.getTableConfig());
+        MultiColumnTextIndexHandler handler = new 
MultiColumnTextIndexHandler(segmentDirectory, indexLoadingConfig,
+            _config.getMultiColumnTextIndexConfig());
         handler.updateIndices(segmentWriter);
         handler.postUpdateIndicesCleanup(segmentWriter);
       }
@@ -480,30 +481,31 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
         .filter(indexType -> indexType.getIndexBuildLifecycle() == 
IndexType.BuildLifecycle.POST_SEGMENT_CREATION)
         .collect(Collectors.toSet());
 
-    if (postSegCreationIndexes.size() > 0) {
+    if (!postSegCreationIndexes.isEmpty()) {
       // Build other indexes
       Map<String, Object> props = new HashMap<>();
       props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap);
       PinotConfiguration segmentDirectoryConfigs = new 
PinotConfiguration(props);
 
+      TableConfig tableConfig = _config.getTableConfig();
+      Schema schema = _config.getSchema();
       SegmentDirectoryLoaderContext segmentLoaderContext =
           new SegmentDirectoryLoaderContext.Builder()
-              .setTableConfig(_config.getTableConfig())
-              .setSchema(_config.getSchema())
+              .setTableConfig(tableConfig)
+              .setSchema(schema)
               .setSegmentName(_segmentName)
               .setSegmentDirectoryConfigs(segmentDirectoryConfigs)
               .build();
 
-      IndexLoadingConfig indexLoadingConfig =
-          new IndexLoadingConfig(null, _config.getTableConfig(), 
_config.getSchema());
+      IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
tableConfig, schema);
 
       try (SegmentDirectory segmentDirectory = 
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
           .load(indexDir.toURI(), segmentLoaderContext);
           SegmentDirectory.Writer segmentWriter = 
segmentDirectory.createWriter()) {
         for (IndexType indexType : postSegCreationIndexes) {
           IndexHandler handler =
-              indexType.createIndexHandler(segmentDirectory, 
indexLoadingConfig.getFieldIndexConfigByColName(),
-                  _config.getSchema(), _config.getTableConfig());
+              indexType.createIndexHandler(segmentDirectory, 
indexLoadingConfig.getFieldIndexConfigByColName(), schema,
+                  tableConfig);
           handler.updateIndices(segmentWriter);
         }
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/bloom/BloomIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/bloom/BloomIndexType.java
index 46d83ad7117..945430d327d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/bloom/BloomIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/bloom/BloomIndexType.java
@@ -117,8 +117,8 @@ public class BloomIndexType extends 
AbstractIndexType<BloomFilterConfig, BloomFi
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new BloomFilterHandler(segmentDirectory, configsByCol, tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new BloomFilterHandler(segmentDirectory, configsByCol, tableConfig, 
schema);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
index ed04808e14e..728f2d65b20 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
@@ -357,7 +357,7 @@ public class DictionaryIndexType
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
+      Schema schema, TableConfig tableConfig) {
     return IndexHandler.NoOp.INSTANCE;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
index 618bfafed97..0c95bf426bd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
@@ -229,8 +229,8 @@ public class ForwardIndexType extends 
AbstractIndexType<ForwardIndexConfig, Forw
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new ForwardIndexHandler(segmentDirectory, configsByCol, schema, 
tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new ForwardIndexHandler(segmentDirectory, configsByCol, 
tableConfig, schema);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/fst/FstIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/fst/FstIndexType.java
index 7935cf38a82..c967061648f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/fst/FstIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/fst/FstIndexType.java
@@ -134,8 +134,8 @@ public class FstIndexType extends 
AbstractIndexType<FstIndexConfig, TextIndexRea
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new FSTIndexHandler(segmentDirectory, configsByCol, tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new FSTIndexHandler(segmentDirectory, configsByCol, tableConfig, 
schema);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
index 3bed4b6aace..01cfa41f438 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
@@ -119,8 +119,8 @@ public class H3IndexType extends 
AbstractIndexType<H3IndexConfig, H3IndexReader,
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new H3IndexHandler(segmentDirectory, configsByCol, tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new H3IndexHandler(segmentDirectory, configsByCol, tableConfig, 
schema);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java
index 83f00e952ab..931b7639775 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java
@@ -146,8 +146,8 @@ public class InvertedIndexType
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new InvertedIndexHandler(segmentDirectory, configsByCol, 
tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new InvertedIndexHandler(segmentDirectory, configsByCol, 
tableConfig, schema);
   }
 
   public static class ReaderFactory implements 
IndexReaderFactory<InvertedIndexReader> {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
index ee1d25c1177..4943e7d0c3f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
@@ -136,8 +136,8 @@ public class JsonIndexType extends 
AbstractIndexType<JsonIndexConfig, JsonIndexR
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new JsonIndexHandler(segmentDirectory, configsByCol, tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new JsonIndexHandler(segmentDirectory, configsByCol, tableConfig, 
schema);
   }
 
   private static class ReaderFactory extends 
IndexReaderFactory.Default<JsonIndexConfig, JsonIndexReader> {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
index bab6eb3bc8b..00c47586cdd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -33,6 +32,7 @@ import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,15 +49,16 @@ public abstract class BaseIndexHandler implements 
IndexHandler {
   protected final Set<String> _tmpForwardIndexColumns;
   protected final SegmentDirectory _segmentDirectory;
   protected final Map<String, FieldIndexConfigs> _fieldIndexConfigs;
-  @Nullable
   protected final TableConfig _tableConfig;
+  protected final Schema _schema;
 
   public BaseIndexHandler(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig) {
-    this(segmentDirectory, indexLoadingConfig.getFieldIndexConfigByColName(), 
indexLoadingConfig.getTableConfig());
+    this(segmentDirectory, indexLoadingConfig.getFieldIndexConfigByColName(), 
indexLoadingConfig.getTableConfig(),
+        indexLoadingConfig.getSchema());
   }
 
   public BaseIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
+      TableConfig tableConfig, Schema schema) {
     _segmentDirectory = segmentDirectory;
     SegmentMetadataImpl segmentMetadata = 
segmentDirectory.getSegmentMetadata();
     if (fieldIndexConfigs.keySet().equals(segmentMetadata.getAllColumns())) {
@@ -68,7 +69,10 @@ public abstract class BaseIndexHandler implements 
IndexHandler {
         _fieldIndexConfigs.putIfAbsent(column, FieldIndexConfigs.EMPTY);
       }
     }
+    Preconditions.checkArgument(tableConfig != null, "Table config must be 
provided");
     _tableConfig = tableConfig;
+    Preconditions.checkArgument(schema != null, "Schema must be provided");
+    _schema = schema;
     _tmpForwardIndexColumns = new HashSet<>();
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 0380773847d..ce347bdc31a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -64,6 +64,7 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -98,22 +99,19 @@ public class ForwardIndexHandler extends BaseIndexHandler {
   private static final List<IndexType<?, ?, ?>> 
DICTIONARY_BASED_INDEXES_TO_REWRITE =
       Arrays.asList(StandardIndexes.range(), StandardIndexes.fst(), 
StandardIndexes.inverted());
 
-  private final Schema _schema;
-
   protected enum Operation {
     DISABLE_FORWARD_INDEX, ENABLE_FORWARD_INDEX, DISABLE_DICTIONARY, 
ENABLE_DICTIONARY, CHANGE_INDEX_COMPRESSION_TYPE
   }
 
   @VisibleForTesting
-  public ForwardIndexHandler(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig, Schema schema) {
-    this(segmentDirectory, indexLoadingConfig.getFieldIndexConfigByColName(), 
schema,
-        indexLoadingConfig.getTableConfig());
+  public ForwardIndexHandler(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig) {
+    this(segmentDirectory, indexLoadingConfig.getFieldIndexConfigByColName(), 
indexLoadingConfig.getTableConfig(),
+        indexLoadingConfig.getSchema());
   }
 
   public ForwardIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      Schema schema, @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
-    _schema = schema;
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
   }
 
   @Override
@@ -203,7 +201,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     Set<String> existingForwardIndexColumns = 
_segmentDirectory.getColumnsWithIndex(StandardIndexes.forward());
     String segmentName = _segmentDirectory.getSegmentMetadata().getName();
     for (String column : existingAllColumns) {
-      if (_schema != null && !_schema.hasColumn(column)) {
+      if (!_schema.hasColumn(column)) {
         // _schema will be null only in tests
         LOGGER.info("Column: {} of segment: {} is not in schema, skipping 
updating forward index", column, segmentName);
         continue;
@@ -298,20 +296,14 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
         }
       } else if (!existingHasDict && newIsDict) {
         // Existing column is RAW. New column is dictionary enabled.
-        if (_schema == null || _tableConfig == null) {
-          // This can only happen in tests.
-          LOGGER.warn("Cannot enable dictionary for column: {} of segment: {} 
as schema or tableConfig is null.",
-              column, segmentName);
-          continue;
-        }
         ColumnMetadata existingColumnMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+        IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
         if (existingColumnMetadata.getFieldSpec().getFieldType() != 
FieldSpec.FieldType.COMPLEX
-            && 
DictionaryIndexType.ignoreDictionaryOverride(_tableConfig.getIndexingConfig().isOptimizeDictionary(),
-            _tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(),
-            
_tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(),
-            
_tableConfig.getIndexingConfig().getNoDictionaryCardinalityRatioThreshold(),
-            existingColumnMetadata.getFieldSpec(), 
_fieldIndexConfigs.get(column),
-            existingColumnMetadata.getCardinality(), 
existingColumnMetadata.getTotalNumberOfEntries())) {
+            && 
DictionaryIndexType.ignoreDictionaryOverride(indexingConfig.isOptimizeDictionary(),
+            indexingConfig.isOptimizeDictionaryForMetrics(), 
indexingConfig.getNoDictionarySizeRatioThreshold(),
+            indexingConfig.getNoDictionaryCardinalityRatioThreshold(), 
existingColumnMetadata.getFieldSpec(),
+            _fieldIndexConfigs.get(column), 
existingColumnMetadata.getCardinality(),
+            existingColumnMetadata.getTotalNumberOfEntries())) {
           columnOperationsMap.put(column, 
Collections.singletonList(Operation.ENABLE_DICTIONARY));
         }
       } else if (existingHasDict && !newIsDict) {
@@ -357,12 +349,6 @@ public class ForwardIndexHandler extends BaseIndexHandler {
   }
 
   private boolean shouldDisableDictionary(String column, ColumnMetadata 
existingColumnMetadata) {
-    if (_schema == null || _tableConfig == null) {
-      // This can only happen in tests.
-      LOGGER.warn("Cannot disable dictionary for column={} as schema or 
tableConfig is null.", column);
-      return false;
-    }
-
     if (existingColumnMetadata.isAutoGenerated() && 
existingColumnMetadata.getCardinality() == 1) {
       LOGGER.warn("Cannot disable dictionary for auto-generated column={} with 
cardinality=1.", column);
       return false;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
index 9105e5ff21c..d9ef0b553f8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
@@ -18,13 +18,12 @@
  */
 package org.apache.pinot.segment.local.segment.index.loader;
 
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
@@ -40,7 +39,6 @@ import 
org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
 import 
org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
 import org.apache.pinot.segment.spi.index.IndexHandler;
 import org.apache.pinot.segment.spi.index.IndexService;
 import org.apache.pinot.segment.spi.index.IndexType;
@@ -71,18 +69,18 @@ import org.slf4j.LoggerFactory;
 public class SegmentPreProcessor implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPreProcessor.class);
 
-  private final URI _indexDirURI;
+  private final SegmentDirectory _segmentDirectory;
   private final IndexLoadingConfig _indexLoadingConfig;
+  private final TableConfig _tableConfig;
   private final Schema _schema;
-  private final SegmentDirectory _segmentDirectory;
 
-  // TODO: Use Schema from IndexLoadingConfig
-  public SegmentPreProcessor(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig,
-      @Nullable Schema schema) {
+  public SegmentPreProcessor(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig) {
     _segmentDirectory = segmentDirectory;
-    _indexDirURI = segmentDirectory.getIndexDir();
     _indexLoadingConfig = indexLoadingConfig;
-    _schema = schema;
+    _tableConfig = indexLoadingConfig.getTableConfig();
+    Preconditions.checkArgument(_tableConfig != null, "Table config must be 
provided");
+    _schema = indexLoadingConfig.getSchema();
+    Preconditions.checkArgument(_schema != null, "Schema must be provided");
   }
 
   @Override
@@ -107,22 +105,18 @@ public class SegmentPreProcessor implements AutoCloseable 
{
     }
 
     // Segment processing has to be done with a local directory.
-    File indexDir = new File(_indexDirURI);
+    File indexDir = new File(_segmentDirectory.getIndexDir());
 
     // This fixes the issue of temporary files not getting deleted after 
creating new inverted indexes.
     removeInvertedIndexTempFiles(indexDir);
 
     try (SegmentDirectory.Writer segmentWriter = 
_segmentDirectory.createWriter()) {
       // Update default columns according to the schema.
-      if (_schema != null) {
-        DefaultColumnHandler defaultColumnHandler =
-            DefaultColumnHandlerFactory.getDefaultColumnHandler(indexDir, 
segmentMetadata, _indexLoadingConfig, _schema,
-                segmentWriter);
-        defaultColumnHandler.updateDefaultColumns();
-        _segmentDirectory.reloadMetadata();
-      } else {
-        LOGGER.warn("Skip creating default columns for segment: {} without 
schema", segmentName);
-      }
+      DefaultColumnHandler defaultColumnHandler =
+          DefaultColumnHandlerFactory.getDefaultColumnHandler(indexDir, 
segmentMetadata, _indexLoadingConfig,
+              segmentWriter);
+      defaultColumnHandler.updateDefaultColumns();
+      _segmentDirectory.reloadMetadata();
 
       // Update single-column indices, like inverted index, json index etc.
       List<IndexHandler> indexHandlers = new ArrayList<>();
@@ -174,8 +168,7 @@ public class SegmentPreProcessor implements AutoCloseable {
         segmentWriter.save();
       }
       // Create/modify/remove multi-col text index if required.
-      if (processMultiColTextIndex(indexDir, 
_indexLoadingConfig.getFieldIndexConfigByColName(),
-          _indexLoadingConfig.getTableConfig(), segmentWriter, 
segmentOperationsThrottler)) {
+      if (processMultiColTextIndex(indexDir, segmentWriter, 
segmentOperationsThrottler)) {
         // NOTE: When adding new steps after this, un-comment the next line.
         //_segmentDirectory.reloadMetadata();
         segmentWriter.save();
@@ -185,7 +178,7 @@ public class SegmentPreProcessor implements AutoCloseable {
 
   private IndexHandler createHandler(IndexType<?, ?, ?> type) {
     return type.createIndexHandler(_segmentDirectory, 
_indexLoadingConfig.getFieldIndexConfigByColName(), _schema,
-        _indexLoadingConfig.getTableConfig());
+        _tableConfig);
   }
 
   /**
@@ -202,14 +195,11 @@ public class SegmentPreProcessor implements AutoCloseable 
{
     String segmentName = segmentMetadata.getName();
     try (SegmentDirectory.Reader segmentReader = 
_segmentDirectory.createReader()) {
       // Check if there is need to update default columns according to the 
schema.
-      if (_schema != null) {
-        DefaultColumnHandler defaultColumnHandler =
-            DefaultColumnHandlerFactory.getDefaultColumnHandler(null, 
segmentMetadata, _indexLoadingConfig, _schema,
-                null);
-        if (defaultColumnHandler.needUpdateDefaultColumns()) {
-          LOGGER.info("Found default columns need updates in segment: {}", 
segmentName);
-          return true;
-        }
+      DefaultColumnHandler defaultColumnHandler =
+          DefaultColumnHandlerFactory.getDefaultColumnHandler(null, 
segmentMetadata, _indexLoadingConfig, null);
+      if (defaultColumnHandler.needUpdateDefaultColumns()) {
+        LOGGER.info("Found default columns need updates in segment: {}", 
segmentName);
+        return true;
       }
       // Check if there is need to update single-column indices, like inverted 
index, json index etc.
       for (IndexType<?, ?, ?> type : 
IndexService.getInstance().getAllIndexes()) {
@@ -279,9 +269,7 @@ public class SegmentPreProcessor implements AutoCloseable {
     return 
MultiColumnTextIndexHandler.shouldModifyMultiColTextIndex(newConfig, oldConfig);
   }
 
-  private boolean processMultiColTextIndex(File indexDir,
-      Map<String, FieldIndexConfigs> configsByCol,
-      TableConfig tableConfig, SegmentDirectory.Writer segmentWriter,
+  private boolean processMultiColTextIndex(File indexDir, 
SegmentDirectory.Writer segmentWriter,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
       throws Exception {
     SegmentMetadataImpl segmentMetadata = 
_segmentDirectory.getSegmentMetadata();
@@ -321,7 +309,7 @@ public class SegmentPreProcessor implements AutoCloseable {
           removeMultiColumnTextIndex(indexDir);
         }
         MultiColumnTextIndexHandler handler =
-            new MultiColumnTextIndexHandler(_segmentDirectory, configsByCol, 
newConfig, tableConfig);
+            new MultiColumnTextIndexHandler(_segmentDirectory, 
_indexLoadingConfig, newConfig);
         handler.updateIndices(segmentWriter);
         handler.postUpdateIndicesCleanup(segmentWriter);
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index f6857460e27..c1b29b4563d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
@@ -46,6 +45,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,8 +57,8 @@ public class BloomFilterHandler extends BaseIndexHandler {
   private final Map<String, BloomFilterConfig> _bloomFilterConfigs;
 
   public BloomFilterHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
     _bloomFilterConfigs = 
FieldIndexConfigsUtil.enableConfigByColumn(StandardIndexes.bloomFilter(), 
fieldIndexConfigs);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 46e3165be10..84129e7e0ce 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -73,6 +73,7 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -129,6 +130,7 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
   protected final File _indexDir;
   protected final SegmentMetadata _segmentMetadata;
   protected final IndexLoadingConfig _indexLoadingConfig;
+  protected final TableConfig _tableConfig;
   protected final Schema _schema;
   protected final SegmentDirectory.Writer _segmentWriter;
 
@@ -137,11 +139,14 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
   private PropertiesConfiguration _segmentProperties;
 
   protected BaseDefaultColumnHandler(File indexDir, SegmentMetadata 
segmentMetadata,
-      IndexLoadingConfig indexLoadingConfig, Schema schema, 
SegmentDirectory.Writer segmentWriter) {
+      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer 
segmentWriter) {
     _indexDir = indexDir;
     _segmentMetadata = segmentMetadata;
     _indexLoadingConfig = indexLoadingConfig;
-    _schema = schema;
+    _tableConfig = _indexLoadingConfig.getTableConfig();
+    Preconditions.checkArgument(_tableConfig != null, "Table config must be 
provided");
+    _schema = _indexLoadingConfig.getSchema();
+    Preconditions.checkArgument(_schema != null, "Schema must be provided");
     _segmentWriter = segmentWriter;
   }
 
@@ -380,11 +385,10 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
    */
   protected boolean createColumnV1Indices(String column)
       throws Exception {
-    TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
     boolean errorOnFailure = _indexLoadingConfig.isErrorOnColumnBuildFailure();
-    if (tableConfig != null && tableConfig.getIngestionConfig() != null
-        && tableConfig.getIngestionConfig().getTransformConfigs() != null) {
-      List<TransformConfig> transformConfigs = 
tableConfig.getIngestionConfig().getTransformConfigs();
+    IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
+    if (ingestionConfig != null && ingestionConfig.getTransformConfigs() != 
null) {
+      List<TransformConfig> transformConfigs = 
ingestionConfig.getTransformConfigs();
       for (TransformConfig transformConfig : transformConfigs) {
         if (transformConfig.getColumnName().equals(column)) {
           String transformFunction = transformConfig.getTransformFunction();
@@ -574,9 +578,7 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
     if (_schema.isEnableColumnBasedNullHandling()) {
       return fieldSpec.isNullable();
     } else {
-      return _indexLoadingConfig.getTableConfig() != null
-          && _indexLoadingConfig.getTableConfig().getIndexingConfig() != null
-          && 
_indexLoadingConfig.getTableConfig().getIndexingConfig().isNullHandlingEnabled();
+      return _tableConfig.getIndexingConfig().isNullHandlingEnabled();
     }
   }
 
@@ -665,8 +667,7 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
           fieldIndexConfigs != null ? 
fieldIndexConfigs.getConfig(StandardIndexes.dictionary())
               : DictionaryIndexConfig.DEFAULT;
       boolean createDictionary = dictionaryIndexConfig.isEnabled();
-      StatsCollectorConfig statsCollectorConfig =
-          new StatsCollectorConfig(_indexLoadingConfig.getTableConfig(), 
_schema, null);
+      StatsCollectorConfig statsCollectorConfig = new 
StatsCollectorConfig(_tableConfig, _schema, null);
       ColumnIndexCreationInfo indexCreationInfo;
       boolean isSingleValue = fieldSpec.isSingleValueField();
       switch (fieldSpec.getDataType().getStoredType()) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerFactory.java
index f723fd588b5..5058ac91e80 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerFactory.java
@@ -23,7 +23,6 @@ import 
org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.data.Schema;
 
 
 public class DefaultColumnHandlerFactory {
@@ -31,11 +30,11 @@ public class DefaultColumnHandlerFactory {
   }
 
   public static DefaultColumnHandler getDefaultColumnHandler(File indexDir, 
SegmentMetadataImpl segmentMetadata,
-      IndexLoadingConfig indexLoadingConfig, Schema schema, 
SegmentDirectory.Writer segmentWriter) {
+      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer 
segmentWriter) {
     if (segmentMetadata.getVersion() == SegmentVersion.v3) {
-      return new V3DefaultColumnHandler(indexDir, segmentMetadata, 
indexLoadingConfig, schema, segmentWriter);
+      return new V3DefaultColumnHandler(indexDir, segmentMetadata, 
indexLoadingConfig, segmentWriter);
     } else {
-      return new V1DefaultColumnHandler(indexDir, segmentMetadata, 
indexLoadingConfig, schema, segmentWriter);
+      return new V1DefaultColumnHandler(indexDir, segmentMetadata, 
indexLoadingConfig, segmentWriter);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/V1DefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/V1DefaultColumnHandler.java
index 1e860c6ae46..1a2935c4c6b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/V1DefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/V1DefaultColumnHandler.java
@@ -22,7 +22,6 @@ import java.io.File;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,8 +30,8 @@ public class V1DefaultColumnHandler extends 
BaseDefaultColumnHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(V1DefaultColumnHandler.class);
 
   public V1DefaultColumnHandler(File indexDir, SegmentMetadataImpl 
segmentMetadata,
-      IndexLoadingConfig indexLoadingConfig, Schema schema, 
SegmentDirectory.Writer segmentWriter) {
-    super(indexDir, segmentMetadata, indexLoadingConfig, schema, 
segmentWriter);
+      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer 
segmentWriter) {
+    super(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/V3DefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/V3DefaultColumnHandler.java
index 7fa10155eec..c4ac70bad5d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/V3DefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/V3DefaultColumnHandler.java
@@ -27,7 +27,6 @@ import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,8 +35,8 @@ public class V3DefaultColumnHandler extends 
BaseDefaultColumnHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(V3DefaultColumnHandler.class);
 
   public V3DefaultColumnHandler(File indexDir, SegmentMetadataImpl 
segmentMetadata,
-      IndexLoadingConfig indexLoadingConfig, Schema schema, 
SegmentDirectory.Writer segmentWriter) {
-    super(indexDir, segmentMetadata, indexLoadingConfig, schema, 
segmentWriter);
+      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer 
segmentWriter) {
+    super(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
index 7a443008f1f..1ebf84e562e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
@@ -41,6 +40,7 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,8 +71,8 @@ public class FSTIndexHandler extends BaseIndexHandler {
   private final Set<String> _columnsToAddIdx;
 
   public FSTIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
     _columnsToAddIdx = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.fst(), 
_fieldIndexConfigs);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
index f55903ff319..78ddd6b110a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
@@ -44,6 +43,7 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,8 +55,8 @@ public class H3IndexHandler extends BaseIndexHandler {
   private final Map<String, H3IndexConfig> _h3Configs;
 
   public H3IndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
     _h3Configs = 
FieldIndexConfigsUtil.enableConfigByColumn(StandardIndexes.h3(), 
_fieldIndexConfigs);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
index 857e00d79fb..56b4752415d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
@@ -40,6 +39,7 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,8 +51,8 @@ public class InvertedIndexHandler extends BaseIndexHandler {
   private final Set<String> _columnsToAddIdx;
 
   public InvertedIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
     _columnsToAddIdx = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.inverted(), 
_fieldIndexConfigs);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
index 4c7c14bf2d8..7b2e1fb0709 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
@@ -44,6 +43,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,8 +55,8 @@ public class JsonIndexHandler extends BaseIndexHandler {
   private final Map<String, JsonIndexConfig> _jsonIndexConfigs;
 
   public JsonIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
     _jsonIndexConfigs = 
FieldIndexConfigsUtil.enableConfigByColumn(StandardIndexes.json(), 
_fieldIndexConfigs);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/MultiColumnTextIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/MultiColumnTextIndexHandler.java
index d646ad5ffc6..00d1f56394d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/MultiColumnTextIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/MultiColumnTextIndexHandler.java
@@ -25,16 +25,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import 
org.apache.pinot.segment.local.segment.creator.impl.text.MultiColumnLuceneTextIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
-import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.TextIndexConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -46,7 +45,6 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
 import org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,12 +81,9 @@ public class MultiColumnTextIndexHandler extends 
BaseIndexHandler {
 
   private final MultiColumnTextIndexConfig _textIndexConfig;
 
-  public MultiColumnTextIndexHandler(
-      SegmentDirectory segmentDirectory,
-      Map<String, FieldIndexConfigs> fieldIndexConfigs,
-      MultiColumnTextIndexConfig textIndexConfig,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+  public MultiColumnTextIndexHandler(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig,
+      MultiColumnTextIndexConfig textIndexConfig) {
+    super(segmentDirectory, indexLoadingConfig);
     _textIndexConfig = textIndexConfig;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
index 18252ad520e..c1e33427643 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
@@ -42,6 +41,7 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,12 +54,13 @@ public class RangeIndexHandler extends BaseIndexHandler {
 
   @VisibleForTesting
   public RangeIndexHandler(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig) {
-    this(segmentDirectory, indexLoadingConfig.getFieldIndexConfigByColName(), 
indexLoadingConfig.getTableConfig());
+    this(segmentDirectory, indexLoadingConfig.getFieldIndexConfigByColName(), 
indexLoadingConfig.getTableConfig(),
+        indexLoadingConfig.getSchema());
   }
 
   public RangeIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
     _columnsToAddIdx = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.range(), 
_fieldIndexConfigs);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
index 58ab479737b..d3f7f311d61 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
@@ -43,6 +42,7 @@ import 
org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,8 +72,8 @@ public class TextIndexHandler extends BaseIndexHandler {
   private final Set<String> _columnsToAddIdx;
 
   public TextIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
     _columnsToAddIdx = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(), 
_fieldIndexConfigs);
   }
 
@@ -142,12 +142,10 @@ public class TextIndexHandler extends BaseIndexHandler {
       throw new UnsupportedOperationException("Text index is currently only 
supported on STRING columns: " + column);
     }
 
-    if (_tableConfig != null && _tableConfig.getIndexingConfig() != null) {
-      MultiColumnTextIndexConfig config = 
_tableConfig.getIndexingConfig().getMultiColumnTextIndexConfig();
-      if (config != null && config.getColumns().contains(column)) {
-        throw new UnsupportedOperationException(
-            "Cannot create both single and multi-column TEXT index on column: 
" + column);
-      }
+    MultiColumnTextIndexConfig config = 
_tableConfig.getIndexingConfig().getMultiColumnTextIndexConfig();
+    if (config != null && config.getColumns().contains(column)) {
+      throw new UnsupportedOperationException(
+          "Cannot create both single and multi-column TEXT index on column: " 
+ column);
     }
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java
index fd78747207d..72d74763ca1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandler.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
@@ -42,6 +41,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +52,8 @@ public class VectorIndexHandler extends BaseIndexHandler {
   private final Map<String, VectorIndexConfig> _vectorConfigs;
 
   public VectorIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
-      @Nullable TableConfig tableConfig) {
-    super(segmentDirectory, fieldIndexConfigs, tableConfig);
+      TableConfig tableConfig, Schema schema) {
+    super(segmentDirectory, fieldIndexConfigs, tableConfig, schema);
     _vectorConfigs = 
FieldIndexConfigsUtil.enableConfigByColumn(StandardIndexes.vector(), 
_fieldIndexConfigs);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
index 1e13e1fa303..6e38e8d86f7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
@@ -113,7 +113,7 @@ public class NullValueIndexType extends 
AbstractIndexType<IndexConfig, NullValue
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
+      Schema schema, TableConfig tableConfig) {
     return IndexHandler.NoOp.INSTANCE;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/range/RangeIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/range/RangeIndexType.java
index d83bfbfdb95..0e34761c606 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/range/RangeIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/range/RangeIndexType.java
@@ -156,8 +156,8 @@ public class RangeIndexType
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new RangeIndexHandler(segmentDirectory, configsByCol, tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new RangeIndexHandler(segmentDirectory, configsByCol, tableConfig, 
schema);
   }
 
   private static class ReaderFactory extends 
IndexReaderFactory.Default<RangeIndexConfig, RangeIndexReader> {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
index 3214c0f4c10..dfe8616bd05 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
@@ -129,8 +129,8 @@ public class TextIndexType extends 
AbstractIndexType<TextIndexConfig, TextIndexR
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new TextIndexHandler(segmentDirectory, configsByCol, tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new TextIndexHandler(segmentDirectory, configsByCol, tableConfig, 
schema);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
index 427c3d0c3a3..4b26ca509df 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
@@ -117,8 +117,8 @@ public class VectorIndexType extends 
AbstractIndexType<VectorIndexConfig, Vector
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new VectorIndexHandler(segmentDirectory, configsByCol, tableConfig);
+      Schema schema, TableConfig tableConfig) {
+    return new VectorIndexHandler(segmentDirectory, configsByCol, tableConfig, 
schema);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/BitmapInvertedIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/BitmapInvertedIndexTest.java
index 52b034049fd..4ce46310543 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/BitmapInvertedIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/BitmapInvertedIndexTest.java
@@ -39,6 +39,8 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.Pairs;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -93,8 +95,14 @@ public class BitmapInvertedIndexTest implements 
PinotBuffersAfterClassCheckRule
   private void testBitmapInvertedIndex(ReadMode readMode)
       throws Exception {
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-        .setInvertedIndexColumns(INVERTED_INDEX_COLUMNS).build();
-    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, null);
+        .setInvertedIndexColumns(INVERTED_INDEX_COLUMNS)
+        .build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addDateTime("time_day", DataType.INT, "EPOCH|DAYS", "1:DAYS")
+        .addSingleValueDimension("column10", DataType.STRING)
+        .addMetric("met_impressionCount", DataType.LONG)
+        .build();
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
     indexLoadingConfig.setReadMode(readMode);
     IndexSegment indexSegment = ImmutableSegmentLoader.load(_segmentDirectory, 
indexLoadingConfig);
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverterTest.java
index 37149f32789..3fa50cfe8cc 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverterTest.java
@@ -100,7 +100,7 @@ public class SegmentV1V2ToV3FormatConverterTest {
 
     // verify that the segment loads correctly. This is necessary and 
sufficient
     // full proof way to ensure that segment is correctly translated
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_segmentDirectory, 
_v3IndexLoadingConfig, null, false);
+    IndexSegment indexSegment = ImmutableSegmentLoader.load(_segmentDirectory, 
_v3IndexLoadingConfig, false);
     Assert.assertNotNull(indexSegment);
     Assert.assertEquals(indexSegment.getSegmentName(), metadata.getName());
     Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 0d558e9e364..dd7825a6760 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -588,7 +588,7 @@ public class ForwardIndexHandlerTest {
   }
 
   private ForwardIndexHandler createForwardIndexHandler() {
-    return new ForwardIndexHandler(_segmentDirectory, 
createIndexLoadingConfig(), SCHEMA);
+    return new ForwardIndexHandler(_segmentDirectory, 
createIndexLoadingConfig());
   }
 
   private Map<String, List<ForwardIndexHandler.Operation>> computeOperations()
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
index a3fdb11a32d..f3436a41506 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
@@ -49,6 +49,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.BytesUtils;
@@ -83,6 +84,8 @@ public class LoaderTest {
 
   private File _avroFile;
   private File _vectorAvroFile;
+  private TableConfig _v1TableConfig;
+  private TableConfig _v3TableConfig;
   private IndexLoadingConfig _v1IndexLoadingConfig;
   private IndexLoadingConfig _v3IndexLoadingConfig;
   private File _indexDir;
@@ -99,14 +102,14 @@ public class LoaderTest {
     assertNotNull(resourceUrl);
     _vectorAvroFile = new File(resourceUrl.getFile());
 
-    TableConfig tableConfig =
+    _v1TableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setSegmentVersion("v1").build();
     Schema schema = createSchema();
-    _v1IndexLoadingConfig = new IndexLoadingConfig(tableConfig, schema);
+    _v1IndexLoadingConfig = new IndexLoadingConfig(_v1TableConfig, schema);
 
-    tableConfig =
+    _v3TableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setSegmentVersion("v3").build();
-    _v3IndexLoadingConfig = new IndexLoadingConfig(tableConfig, schema);
+    _v3IndexLoadingConfig = new IndexLoadingConfig(_v3TableConfig, schema);
   }
 
   private Schema createSchema()
@@ -114,7 +117,7 @@ public class LoaderTest {
     return SegmentTestUtils.extractSchemaFromAvroWithoutTime(_avroFile);
   }
 
-  private Schema constructV1Segment()
+  private void constructV1Segment()
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
 
@@ -129,7 +132,6 @@ public class LoaderTest {
     driver.build();
 
     _indexDir = new File(INDEX_DIR, driver.getSegmentName());
-    return schema;
   }
 
   @Test
@@ -207,14 +209,14 @@ public class LoaderTest {
   @Test
   public void testBuiltInVirtualColumns()
       throws Exception {
-    Schema schema = constructV1Segment();
+    constructV1Segment();
 
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v1IndexLoadingConfig, schema,
-        SEGMENT_OPERATIONS_THROTTLER);
+    IndexSegment indexSegment =
+        ImmutableSegmentLoader.load(_indexDir, _v1IndexLoadingConfig, 
SEGMENT_OPERATIONS_THROTTLER);
     testBuiltInVirtualColumns(indexSegment);
     indexSegment.destroy();
 
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v1IndexLoadingConfig, SEGMENT_OPERATIONS_THROTTLER);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v3IndexLoadingConfig, SEGMENT_OPERATIONS_THROTTLER);
     testBuiltInVirtualColumns(indexSegment);
     indexSegment.destroy();
   }
@@ -233,17 +235,20 @@ public class LoaderTest {
   @Test
   public void testDefaultEmptyValueStringColumn()
       throws Exception {
-    Schema schema = constructV1Segment();
-    schema.addField(new DimensionFieldSpec("SVString", 
FieldSpec.DataType.STRING, true, ""));
-    schema.addField(new DimensionFieldSpec("MVString", 
FieldSpec.DataType.STRING, false, ""));
+    Schema schema = createSchema();
+    schema.addField(new DimensionFieldSpec("SVString", DataType.STRING, true, 
""));
+    schema.addField(new DimensionFieldSpec("MVString", DataType.STRING, false, 
""));
 
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v1IndexLoadingConfig, schema,
+    constructV1Segment();
+    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(_v1TableConfig, schema),
         SEGMENT_OPERATIONS_THROTTLER);
     
assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0), "");
     
assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0), "");
     indexSegment.destroy();
 
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v3IndexLoadingConfig, schema, SEGMENT_OPERATIONS_THROTTLER);
+    constructV1Segment();
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(_v3TableConfig, schema),
+        SEGMENT_OPERATIONS_THROTTLER);
     
assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0), "");
     
assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0), "");
     indexSegment.destroy();
@@ -252,13 +257,21 @@ public class LoaderTest {
   @Test
   public void testDefaultBytesColumn()
       throws Exception {
-    Schema schema = constructV1Segment();
+    Schema schema = createSchema();
     String newColumnName = "byteMetric";
     String defaultValue = "0000ac0000";
-
-    FieldSpec byteMetric = new MetricFieldSpec(newColumnName, 
FieldSpec.DataType.BYTES, defaultValue);
+    FieldSpec byteMetric = new MetricFieldSpec(newColumnName, DataType.BYTES, 
defaultValue);
     schema.addField(byteMetric);
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v3IndexLoadingConfig, schema,
+
+    constructV1Segment();
+    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(_v1TableConfig, schema),
+        SEGMENT_OPERATIONS_THROTTLER);
+    assertEquals(BytesUtils.toHexString((byte[]) 
indexSegment.getDataSource(newColumnName).getDictionary().get(0)),
+        defaultValue);
+    indexSegment.destroy();
+
+    constructV1Segment();
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(_v3TableConfig, schema),
         SEGMENT_OPERATIONS_THROTTLER);
     assertEquals(BytesUtils.toHexString((byte[]) 
indexSegment.getDataSource(newColumnName).getDictionary().get(0)),
         defaultValue);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 0dc0f2b5336..53b2564a68b 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -281,14 +281,16 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
   }
 
   private TableConfig createTableConfig() {
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName("daysSinceEpoch")
-            .setNoDictionaryColumns(new ArrayList<>(_noDictionaryColumns))
-            .setInvertedIndexColumns(new ArrayList<>(_invertedIndexColumns))
-            .setCreateInvertedIndexDuringSegmentGeneration(true)
-            .setRangeIndexColumns(new ArrayList<>(_rangeIndexColumns))
-            .setFieldConfigList(new 
ArrayList<>(_fieldConfigMap.values())).setNullHandlingEnabled(true)
-            .setIngestionConfig(_ingestionConfig).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName("daysSinceEpoch")
+        .setNoDictionaryColumns(new ArrayList<>(_noDictionaryColumns))
+        .setInvertedIndexColumns(new ArrayList<>(_invertedIndexColumns))
+        .setCreateInvertedIndexDuringSegmentGeneration(true)
+        .setRangeIndexColumns(new ArrayList<>(_rangeIndexColumns))
+        .setFieldConfigList(new ArrayList<>(_fieldConfigMap.values()))
+        .setNullHandlingEnabled(true)
+        .setIngestionConfig(_ingestionConfig)
+        .build();
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
     if (_columnMinMaxValueGeneratorMode != null) {
       
indexingConfig.setColumnMinMaxValueGeneratorMode(_columnMinMaxValueGeneratorMode.name());
@@ -315,8 +317,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
   private void runPreProcessor(Schema schema)
       throws Exception {
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
-        SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory, createIndexLoadingConfig(schema),
-            schema)) {
+        SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory, createIndexLoadingConfig(schema))) {
       processor.process(SEGMENT_OPERATIONS_THROTTLER);
     }
   }
@@ -464,9 +465,8 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     buildV3Segment();
     validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW, 42242, 16, 
false, false, false, 0, true, 0,
         ChunkCompressionType.LZ4, false, DataType.INT, 100000);
-    long oldRangeIndexSize =
-        new 
SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(EXISTING_INT_COL_RAW)
-            .getIndexSizeFor(StandardIndexes.range());
+    long oldRangeIndexSize = new 
SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(EXISTING_INT_COL_RAW)
+        .getIndexSizeFor(StandardIndexes.range());
     // At this point, the segment has range index. Now the reload path should 
create a dictionary and rewrite the
     // range index.
     _noDictionaryColumns.remove(EXISTING_INT_COL_RAW);
@@ -474,9 +474,8 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
         DataType.INT, 100000);
     validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW, 42242, 16, 
false, true, false, 0, true, 0, null, false,
         DataType.INT, 100000);
-    long newRangeIndexSize =
-        new 
SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(EXISTING_INT_COL_RAW)
-            .getIndexSizeFor(StandardIndexes.range());
+    long newRangeIndexSize = new 
SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(EXISTING_INT_COL_RAW)
+        .getIndexSizeFor(StandardIndexes.range());
     assertNotEquals(oldRangeIndexSize, newRangeIndexSize);
   }
 
@@ -1505,7 +1504,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
       throws Exception {
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            createIndexLoadingConfig(_newColumnsSchemaWithH3Json), 
_newColumnsSchemaWithH3Json)) {
+            createIndexLoadingConfig(_newColumnsSchemaWithH3Json))) {
       assertTrue(processor.needProcess());
       processor.process(SEGMENT_OPERATIONS_THROTTLER);
     }
@@ -1516,7 +1515,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
       throws Exception {
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            createIndexLoadingConfig(_newColumnsSchemaWithH3Json), 
_newColumnsSchemaWithH3Json)) {
+            createIndexLoadingConfig(_newColumnsSchemaWithH3Json))) {
       assertFalse(processor.needProcess());
     }
   }
@@ -1529,7 +1528,8 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     long[] longValues = {1588316400000L, 1588489200000L, 1588662000000L, 
1588834800000L, 1589007600000L};
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
     Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension("stringCol", 
FieldSpec.DataType.STRING)
-        .addMetric("longCol", FieldSpec.DataType.LONG).build();
+        .addMetric("longCol", FieldSpec.DataType.LONG)
+        .build();
 
     // build good segment, no needPreprocess
     buildTestSegment(tableConfig, schema, stringValuesValid, longValues);
@@ -1537,7 +1537,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     
indexingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL.name());
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertFalse(processor.needProcess());
     }
 
@@ -1546,13 +1546,13 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     
indexingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE.name());
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertFalse(processor.needProcess());
     }
     
indexingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL.name());
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertFalse(processor.needProcess());
     }
 
@@ -1561,13 +1561,13 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     
indexingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE.name());
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertFalse(processor.needProcess());
     }
     
indexingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL.name());
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertTrue(processor.needProcess());
     }
   }
@@ -1580,7 +1580,8 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     long[] longValues = {1588316400000L, 1588489200000L, 1588662000000L, 
1588834800000L, 1589007600000L};
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
     Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension("stringCol", 
FieldSpec.DataType.STRING)
-        .addMetric("longCol", FieldSpec.DataType.LONG).build();
+        .addMetric("longCol", FieldSpec.DataType.LONG)
+        .build();
 
     // build good segment, no needPreprocess
     buildTestSegment(tableConfig, schema, stringValuesValid, longValues);
@@ -1588,7 +1589,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     
indexingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL.name());
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertFalse(processor.needProcess());
     }
   }
@@ -1601,13 +1602,14 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     long[] longValues = {2, 1, 2, 3, 4, 5, 3, 2};
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
     Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension("stringCol", 
FieldSpec.DataType.STRING)
-        .addMetric("longCol", DataType.LONG).build();
+        .addMetric("longCol", DataType.LONG)
+        .build();
 
     // Build good segment, no need for preprocess
     buildTestSegment(tableConfig, schema, stringValues, longValues);
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertFalse(processor.needProcess());
     }
 
@@ -1616,7 +1618,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     indexingConfig.setNoDictionaryColumns(List.of("longCol"));
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertTrue(processor.needProcess());
       processor.process(SEGMENT_OPERATIONS_THROTTLER);
     }
@@ -1625,7 +1627,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     indexingConfig.setNoDictionaryColumns(null);
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertTrue(processor.needProcess());
       processor.process(SEGMENT_OPERATIONS_THROTTLER);
     }
@@ -1638,7 +1640,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     indexingConfig.setStarTreeIndexConfigs(List.of(starTreeIndexConfig));
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertTrue(processor.needProcess());
       processor.process(SEGMENT_OPERATIONS_THROTTLER);
     }
@@ -1647,7 +1649,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     indexingConfig.setStarTreeIndexConfigs(null);
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertTrue(processor.needProcess());
       processor.process(SEGMENT_OPERATIONS_THROTTLER);
     }
@@ -1657,7 +1659,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     indexingConfig.setStarTreeIndexConfigs(List.of(starTreeIndexConfig));
     try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory,
-            new IndexLoadingConfig(tableConfig, schema), schema)) {
+            new IndexLoadingConfig(tableConfig, schema))) {
       assertTrue(processor.needProcess());
       processor.process(SEGMENT_OPERATIONS_THROTTLER);
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerTest.java
index dc008a487df..78a64eef7f4 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandlerTest.java
@@ -128,7 +128,7 @@ public class DefaultColumnHandlerTest {
   private void testComputeDefaultColumnActionMap(Map<String, 
DefaultColumnAction> expected) {
     BaseDefaultColumnHandler defaultColumnHandler =
         new V3DefaultColumnHandler(INDEX_DIR, 
_segmentDirectory.getSegmentMetadata(),
-            new IndexLoadingConfig(TABLE_CONFIG, _schema), _schema, _writer);
+            new IndexLoadingConfig(TABLE_CONFIG, _schema), _writer);
     assertEquals(defaultColumnHandler.computeDefaultColumnActionMap(), 
expected);
   }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexType.java
index 3f671e12da7..d07f1255048 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexType.java
@@ -115,8 +115,9 @@ public interface IndexType<C extends IndexConfig, IR 
extends IndexReader, IC ext
    */
   List<String> getFileExtensions(@Nullable ColumnMetadata columnMetadata);
 
+  // TODO: Consider passing in IndexLoadingConfig
   IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig);
+      Schema schema, TableConfig tableConfig);
 
   /**
    * This method is used to perform in place conversion of provided {@link 
TableConfig} to newer format
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/IndexServiceTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/IndexServiceTest.java
index 94dfd3a9102..9c6b2b3bd6a 100644
--- 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/IndexServiceTest.java
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/IndexServiceTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertSame;
 
 
 public class IndexServiceTest {
@@ -122,7 +122,7 @@ public class IndexServiceTest {
 
     @Override
     public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory,
-        Map<String, FieldIndexConfigs> configsByCol, @Nullable Schema schema, 
@Nullable TableConfig tableConfig) {
+        Map<String, FieldIndexConfigs> configsByCol, Schema schema, 
TableConfig tableConfig) {
       throw new UnsupportedOperationException(IndexType.class.getName() + " 
should not be created");
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to