This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 43ee0f3ada Pinot Server : Refresh message for Table Schema  (#15956)
43ee0f3ada is described below

commit 43ee0f3ada340a1d864f4b5a6dc9916980f3ef2e
Author: Praveen <[email protected]>
AuthorDate: Tue Jun 3 16:08:07 2025 -0700

    Pinot Server : Refresh message for Table Schema  (#15956)
---
 .../messages/TableConfigSchemaRefreshMessage.java  | 55 ++++++++++++++++++++++
 .../apache/pinot/common/metrics/ServerMeter.java   |  2 +
 .../helix/core/PinotHelixResourceManager.java      | 46 ++++++++++++++++--
 .../helix/SegmentMessageHandlerFactory.java        | 30 ++++++++++++
 4 files changed, 128 insertions(+), 5 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/TableConfigSchemaRefreshMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/TableConfigSchemaRefreshMessage.java
new file mode 100644
index 0000000000..a63ac63466
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/TableConfigSchemaRefreshMessage.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+// TODO: To evaluate if this message should be send on any table-config 
updates as well
+public class TableConfigSchemaRefreshMessage extends Message {
+  public static final String REFRESH_TABLE_CONFIG_AND_SCHEMA = 
"REFRESH_TABLE_CONFIG_AND_SCHEMA";
+  private static final String TABLE_NAME_KEY = "tableName";
+
+  /**
+   * Constructor for the sender.
+   */
+  public TableConfigSchemaRefreshMessage(String tableNameWithType) {
+    super(Message.MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setMsgSubType(REFRESH_TABLE_CONFIG_AND_SCHEMA);
+    // Give it infinite time to process the message, as long as session is 
alive
+    setExecutionTimeout(-1);
+    ZNRecord znRecord = getRecord();
+    znRecord.setSimpleField(TABLE_NAME_KEY, tableNameWithType);
+  }
+
+  /**
+   * Constructor for the receiver.
+   */
+  public TableConfigSchemaRefreshMessage(Message message) {
+    super(message.getRecord());
+    if (!message.getMsgSubType().equals(REFRESH_TABLE_CONFIG_AND_SCHEMA)) {
+      throw new IllegalArgumentException("Invalid message subtype:" + 
message.getMsgSubType());
+    }
+  }
+
+  public String getTableNameWithType() {
+    return getRecord().getSimpleField(TABLE_NAME_KEY);
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 5ba4ddb8e9..1c56532b55 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -140,6 +140,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
 
   DIRECT_MEMORY_OOM("directMemoryOOMCount", true),
 
+  TABLE_CONFIG_AND_SCHEMA_REFRESH_FAILURES("tables", true, "Number of failures 
to refresh table config and schema"),
+
   // Multi-stage
   /**
    * Number of times the max number of rows in the hash table has been reached.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 93ece7c42e..d7bcf53bac 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -111,6 +111,7 @@ import 
org.apache.pinot.common.messages.RunPeriodicTaskMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
+import org.apache.pinot.common.messages.TableConfigSchemaRefreshMessage;
 import org.apache.pinot.common.messages.TableDeletionMessage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
@@ -1585,13 +1586,27 @@ public class PinotHelixResourceManager {
     }
 
     updateSchema(schema, oldSchema, forceTableSchemaUpdate);
-
-    if (reload) {
-      LOGGER.info("Reloading tables with name: {}", schemaName);
+    try {
       List<String> tableNamesWithType = 
getExistingTableNamesWithType(schemaName, null);
-      for (String tableNameWithType : tableNamesWithType) {
-        reloadAllSegments(tableNameWithType, false, null);
+      if (reload) {
+        LOGGER.info("Reloading tables with name: {}", schemaName);
+        for (String tableNameWithType : tableNamesWithType) {
+          reloadAllSegments(tableNameWithType, false, null);
+        }
+      } else {
+        // Send schema refresh message to all tables that use this schema
+        for (String tableNameWithType : tableNamesWithType) {
+          LOGGER.info("Sending updated schema message for table: {}", 
tableNameWithType);
+          sendTableConfigSchemaRefreshMessage(tableNameWithType, 
getServerInstancesForTable(tableNameWithType,
+              TableNameBuilder.getTableTypeFromTableName(tableNameWithType)));
+        }
       }
+    } catch (TableNotFoundException e) {
+      if (reload) {
+        throw e;
+      }
+      // We don't throw exception if no tables found for schema when reload is 
false. Since this could be valid case
+      LOGGER.warn("No tables found for schema (refresh only): {}", schemaName, 
e);
     }
   }
 
@@ -3286,6 +3301,27 @@ public class PinotHelixResourceManager {
     }
   }
 
+  private void sendTableConfigSchemaRefreshMessage(String tableNameWithType, 
List<String> instances) {
+    TableConfigSchemaRefreshMessage refreshMessage = new 
TableConfigSchemaRefreshMessage(tableNameWithType);
+    for (String instance : instances) {
+      // Send refresh message to servers
+      Criteria recipientCriteria = new Criteria();
+      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      recipientCriteria.setInstanceName(instance);
+      recipientCriteria.setSessionSpecific(true);
+      ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+      // Send message with no callback and infinite timeout on the recipient
+      int numMessagesSent = messagingService.send(recipientCriteria, 
refreshMessage, null, -1);
+      if (numMessagesSent > 0) {
+        LOGGER.info("Sent {} schema refresh messages to servers for table: {} 
for instance: {}", numMessagesSent,
+            tableNameWithType, instance);
+      } else {
+        LOGGER.warn("No schema refresh message sent to servers for table: {} 
for instance: {}", tableNameWithType,
+            instance);
+      }
+    }
+  }
+
   /**
    * Update the instance config given the broker instance id
    */
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index b78cf095bf..3429ec0c3c 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.messages.ForceCommitMessage;
 import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
+import org.apache.pinot.common.messages.TableConfigSchemaRefreshMessage;
 import org.apache.pinot.common.messages.TableDeletionMessage;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -74,6 +75,8 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
         return new ForceCommitMessageHandler(new ForceCommitMessage(message), 
_metrics, context);
       case IngestionMetricsRemoveMessage.INGESTION_METRICS_REMOVE_MSG_SUB_TYPE:
         return new IngestionMetricsRemoveMessageHandler(new 
IngestionMetricsRemoveMessage(message), _metrics, context);
+      case TableConfigSchemaRefreshMessage.REFRESH_TABLE_CONFIG_AND_SCHEMA:
+        return new TableSchemaRefreshMessageHandler(new 
TableConfigSchemaRefreshMessage(message), _metrics, context);
       default:
         LOGGER.warn("Unsupported user defined message sub type: {} for 
segment: {}", msgSubType,
             message.getPartitionName());
@@ -243,6 +246,33 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
     }
   }
 
+  private class TableSchemaRefreshMessageHandler extends DefaultMessageHandler 
{
+    TableSchemaRefreshMessageHandler(TableConfigSchemaRefreshMessage message, 
ServerMetrics metrics,
+                                     NotificationContext context) {
+      super(message, metrics, context);
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      _logger.info("Handling table schema refresh message for table: {}", 
_tableNameWithType);
+      try {
+        TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(_tableNameWithType);
+        if (tableDataManager != null) {
+          // Update the table config and schema by fetching from ZK
+          tableDataManager.fetchIndexLoadingConfig();
+        } else {
+          _logger.warn("No data manager found for table: {}", 
_tableNameWithType);
+        }
+      } catch (Exception e) {
+        _metrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.TABLE_CONFIG_AND_SCHEMA_REFRESH_FAILURES, 1);
+        Utils.rethrowException(e);
+      }
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      helixTaskResult.setSuccess(true);
+      return helixTaskResult;
+    }
+  }
+
   private static class DefaultMessageHandler extends MessageHandler {
     final String _segmentName;
     final String _tableNameWithType;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to