Jackie-Jiang commented on code in PR #15782:
URL: https://github.com/apache/pinot/pull/15782#discussion_r2249472965


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetHandler.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.periodictask;
+
+import java.util.Collection;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public abstract class RealtimeOffsetAutoResetHandler {
+  protected PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  protected PinotHelixResourceManager _pinotHelixResourceManager;
+
+  public RealtimeOffsetAutoResetHandler(PinotLLCRealtimeSegmentManager 
llcRealtimeSegmentManager,

Review Comment:
   Suggest making this an interface, and add an `init()` to initialize it



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java:
##########
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.periodictask;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class RealtimeOffsetAutoResetKafkaHandler extends 
RealtimeOffsetAutoResetHandler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetKafkaHandler.class);
+  private static final String STREAM_TYPE = "kafka";
+
+  public RealtimeOffsetAutoResetKafkaHandler(PinotLLCRealtimeSegmentManager 
llcRealtimeSegmentManager,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    super(llcRealtimeSegmentManager, pinotHelixResourceManager);
+  }
+
+  /**
+   * Trigger the job to backfill the skipped interval due to offset auto reset.
+   * It is expected to backfill the [fromOffset, toOffset) interval.
+   * @return true if successfully started the backfill job and its ingestion
+   */
+  @Override
+  public boolean triggerBackfillJob(
+      String tableNameWithType, StreamConfig streamConfig, String topicName, 
int partitionId, long fromOffset,
+      long toOffset) {
+    Map<String, String> newTopicStreamConfig = 
triggerDataReplicationAndGetTopicInfo(
+        tableNameWithType, streamConfig, topicName, partitionId, fromOffset, 
toOffset);
+    if (newTopicStreamConfig == null) {
+      return false;
+    }
+    try {
+      TableConfig currentTableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      addNewTopicToTableConfig(newTopicStreamConfig, currentTableConfig);

Review Comment:
   I don't follow this backfill logic. Is the idea to use a different Kafka 
topic to backfill the data? Some comments will help explain



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.periodictask.RealtimeOffsetAutoResetHandler;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<RealtimeOffsetAutoResetManager.Context> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
+  private final Map<String, Set<String>> _tableTopicsUnderBackfill;
+  private final Map<String, Set<String>> _tableEphemeralTopics;
+
+  public RealtimeOffsetAutoResetManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ControllerMetrics controllerMetrics) {
+    super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetFrequencyInSeconds(),
+        config.getRealtimeOffsetAutoResetInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableToHandler = new ConcurrentHashMap<>();
+    _tableTopicsUnderBackfill = new ConcurrentHashMap<>();
+    _tableEphemeralTopics = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected RealtimeOffsetAutoResetManager.Context preprocess(Properties 
periodicTaskProperties) {
+    RealtimeOffsetAutoResetManager.Context context = new 
RealtimeOffsetAutoResetManager.Context();
+    // Fill offset backfill job required info
+    // Examples of properties:
+    //   resetOffsetTopicName=topicName
+    //   resetOffsetTopicPartition=0
+    //   resetOffsetFrom=0
+    //   resetOffsetTo=1000
+    if 
(periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0]))
 {
+      context._shouldTriggerBackfillJobs = true;
+      for (String key : context._backfillJobPropertyKeys) {
+        context._backfillJobProperties.put(key, 
periodicTaskProperties.getProperty(key));
+      }
+    }
+    return context;
+  }
+
+  @VisibleForTesting
+  protected RealtimeOffsetAutoResetHandler getTableHandler(String 
tableNameWithType) {
+    return _tableToHandler.get(tableNameWithType);
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType, 
RealtimeOffsetAutoResetManager.Context context) {
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return;
+    }
+    LOGGER.info("Processing offset auto reset backfill for table {}, with 
context {}", tableNameWithType, context);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.error("Failed to find table config for table: {}, skipping auto 
reset periodic job", tableNameWithType);
+      return;
+    }
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    if (handler == null) {
+      return;
+    }
+
+    if (context._shouldTriggerBackfillJobs) {
+      _tableTopicsUnderBackfill.putIfAbsent(tableNameWithType, 
ConcurrentHashMap.newKeySet());
+      String topicName = 
context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_NAME);
+      _tableTopicsUnderBackfill.get(tableNameWithType).add(topicName);
+
+      StreamConfig topicStreamConfig = 
IngestionConfigUtils.getStreamConfigs(tableConfig).stream()
+          .filter(config -> topicName.equals(config.getTopicName()))
+          .findFirst().orElseThrow(() -> new RuntimeException("No matching 
topic found"));
+      LOGGER.info("Trigger backfill jobs with StreamConfig {}, topicName {}, 
properties {}",
+          topicStreamConfig, topicName, context._backfillJobProperties);
+      try {
+        
_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType,
+            topicStreamConfig,
+            topicName,
+            
Integer.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
+            
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)),
+            
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)));
+      } catch (NumberFormatException e) {
+        LOGGER.error("Invalid backfill job properties for table: {}, 
properties: {}, error: {}",
+            tableNameWithType, context._backfillJobProperties, e.getMessage(), 
e);
+      }
+    }
+
+    ensureBackfillJobsRunning(tableNameWithType);
+    ensureCompletedBackfillJobsCleanedUp(tableConfig);
+  }
+
+  /**
+   * Get the list of tables & topics being backfilled and ensure the backfill 
jobs are running.
+   */
+  private void ensureBackfillJobsRunning(String tableNameWithType) {
+    // Recover state from ephemeral multi-topics ingestion
+    // TODO: refactor or add other recover methods when other backfill 
approaches are ready
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    for (StreamConfig streamConfig : 
IngestionConfigUtils.getStreamConfigs(tableConfig)) {
+      if (streamConfig.isEphemeralBackfillTopic()) {
+        _tableEphemeralTopics.putIfAbsent(tableNameWithType, 
ConcurrentHashMap.newKeySet());
+        
_tableEphemeralTopics.get(tableNameWithType).add(streamConfig.getTopicName());
+      }
+    }
+    if (!_tableTopicsUnderBackfill.containsKey(tableNameWithType)
+        || _tableTopicsUnderBackfill.get(tableNameWithType).isEmpty()) {
+      return;
+    }
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    if (handler == null) {
+      return;
+    }
+    handler.ensureBackfillJobsRunning(tableNameWithType, 
_tableTopicsUnderBackfill.get(tableNameWithType));
+  }
+
+  private void ensureCompletedBackfillJobsCleanedUp(TableConfig tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    if (!_tableEphemeralTopics.containsKey(tableNameWithType)) {
+      return;
+    }
+    LOGGER.info("Trying to clean up backfill jobs on {}", tableNameWithType);
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    Collection<String> cleanedUpTopics = handler.cleanupCompletedBackfillJobs(
+        tableNameWithType, _tableEphemeralTopics.get(tableNameWithType));
+    if 
(cleanedUpTopics.containsAll(_tableEphemeralTopics.get(tableNameWithType))) {
+      _tableTopicsUnderBackfill.remove(tableNameWithType);
+      _tableEphemeralTopics.remove(tableNameWithType);
+      if (_tableToHandler.get(tableNameWithType) != null) {
+        _tableToHandler.get(tableNameWithType).close();
+        _tableToHandler.remove(tableNameWithType);
+      }
+    } else {
+      _tableEphemeralTopics.get(tableNameWithType).removeAll(cleanedUpTopics);
+    }
+    if (cleanedUpTopics.size() > 0) {
+      LOGGER.info("Cleaned up complete backfill topics {} for table {}", 
cleanedUpTopics, tableNameWithType);
+    }
+  }
+
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      _tableTopicsUnderBackfill.remove(tableNameWithType);
+      _tableToHandler.remove(tableNameWithType);
+    }
+  }
+
+  private RealtimeOffsetAutoResetHandler getOrConstructHandler(TableConfig 
tableConfig) {
+    RealtimeOffsetAutoResetHandler handler = 
_tableToHandler.get(tableConfig.getTableName());
+    if (handler != null) {
+      return handler;
+    }
+    if (tableConfig.getIngestionConfig() == null
+        || tableConfig.getIngestionConfig().getStreamIngestionConfig() == 
null) {
+      LOGGER.debug("Table {} config is in the legacy mode, cannot do auto 
reset", tableConfig.getTableName());
+      return null;
+    }
+    String className = 
tableConfig.getIngestionConfig().getStreamIngestionConfig()
+        .getRealtimeOffsetAutoResetHandlerClass();
+    if (className == null) {
+      LOGGER.debug("RealtimeOffsetAutoResetHandlerClass is not specified for 
table {}", tableConfig.getTableName());
+      return null;
+    }
+    try {
+      Class<?> clazz = Class.forName(className);
+      if (!RealtimeOffsetAutoResetHandler.class.isAssignableFrom(clazz)) {
+        String exceptionMessage = "Custom analyzer must be a child of "
+            + RealtimeOffsetAutoResetHandler.class.getCanonicalName();
+        throw new ReflectiveOperationException(exceptionMessage);
+      }
+      handler = (RealtimeOffsetAutoResetHandler) clazz.getConstructor(
+          PinotLLCRealtimeSegmentManager.class, 
PinotHelixResourceManager.class).newInstance(
+          _llcRealtimeSegmentManager, _pinotHelixResourceManager);
+      _tableToHandler.put(tableConfig.getTableName(), handler);
+      return handler;
+    } catch (Exception e) {
+      LOGGER.error("Cannot create RealtimeOffsetAutoResetHandler", e);
+      return null;
+    }
+  }
+
+
+
+  public class Context {
+    public final List<String> _backfillJobPropertyKeys = List.of(
+        Constants.RESET_OFFSET_TOPIC_NAME, 
Constants.RESET_OFFSET_TOPIC_PARTITION,
+        Constants.RESET_OFFSET_FROM, Constants.RESET_OFFSET_TO);
+    private boolean _shouldTriggerBackfillJobs;
+    private Map<String, String> _backfillJobProperties = new HashMap<>();

Review Comment:
   (minor) This can be `final`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java:
##########
@@ -275,6 +279,7 @@ public static long getRandomInitialDelayInSeconds() {
     public static final int DEFAULT_RETENTION_MANAGER_FREQUENCY_IN_SECONDS = 6 
* 60 * 60; // 6 Hours.
     public static final int 
DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; 
// 24 Hours.
     public static final int 
DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    public static final int DEFAULT_OFFSET_AUTO_RESET_FREQUENCY_IN_SECONDS = 
60 * 60; // 1 Hour.

Review Comment:
   Given this is an opt-in feature, I'd suggest disable the manager by default 
until it is tested in production



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -187,6 +187,12 @@ public List<TopicMetadata> getTopics() {
     }
   }
 
+  @Override

Review Comment:
   This should also be added to `kafka30`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java:
##########
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.periodictask;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class RealtimeOffsetAutoResetKafkaHandler extends 
RealtimeOffsetAutoResetHandler {

Review Comment:
   Is this PR half implemented? Seems there is no concrete implementation of 
`RealtimeOffsetAutoResetHandler`?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java:
##########
@@ -127,6 +127,10 @@ default Map<String, PartitionLagState> 
getCurrentPartitionLagState(
     return result;
   }
 
+  default StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long 
timestampMillis) {

Review Comment:
   Annotate it with `@Nullable`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.periodictask.RealtimeOffsetAutoResetHandler;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<RealtimeOffsetAutoResetManager.Context> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
+  private final Map<String, Set<String>> _tableTopicsUnderBackfill;
+  private final Map<String, Set<String>> _tableEphemeralTopics;
+
+  public RealtimeOffsetAutoResetManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ControllerMetrics controllerMetrics) {
+    super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetFrequencyInSeconds(),
+        config.getRealtimeOffsetAutoResetInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableToHandler = new ConcurrentHashMap<>();
+    _tableTopicsUnderBackfill = new ConcurrentHashMap<>();
+    _tableEphemeralTopics = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected RealtimeOffsetAutoResetManager.Context preprocess(Properties 
periodicTaskProperties) {
+    RealtimeOffsetAutoResetManager.Context context = new 
RealtimeOffsetAutoResetManager.Context();
+    // Fill offset backfill job required info
+    // Examples of properties:
+    //   resetOffsetTopicName=topicName
+    //   resetOffsetTopicPartition=0
+    //   resetOffsetFrom=0
+    //   resetOffsetTo=1000
+    if 
(periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0]))
 {
+      context._shouldTriggerBackfillJobs = true;
+      for (String key : context._backfillJobPropertyKeys) {
+        context._backfillJobProperties.put(key, 
periodicTaskProperties.getProperty(key));
+      }
+    }
+    return context;
+  }
+
+  @VisibleForTesting
+  protected RealtimeOffsetAutoResetHandler getTableHandler(String 
tableNameWithType) {
+    return _tableToHandler.get(tableNameWithType);
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType, 
RealtimeOffsetAutoResetManager.Context context) {
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return;
+    }
+    LOGGER.info("Processing offset auto reset backfill for table {}, with 
context {}", tableNameWithType, context);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.error("Failed to find table config for table: {}, skipping auto 
reset periodic job", tableNameWithType);
+      return;
+    }
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    if (handler == null) {
+      return;
+    }
+
+    if (context._shouldTriggerBackfillJobs) {
+      _tableTopicsUnderBackfill.putIfAbsent(tableNameWithType, 
ConcurrentHashMap.newKeySet());
+      String topicName = 
context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_NAME);
+      _tableTopicsUnderBackfill.get(tableNameWithType).add(topicName);
+
+      StreamConfig topicStreamConfig = 
IngestionConfigUtils.getStreamConfigs(tableConfig).stream()
+          .filter(config -> topicName.equals(config.getTopicName()))
+          .findFirst().orElseThrow(() -> new RuntimeException("No matching 
topic found"));
+      LOGGER.info("Trigger backfill jobs with StreamConfig {}, topicName {}, 
properties {}",
+          topicStreamConfig, topicName, context._backfillJobProperties);
+      try {
+        
_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType,
+            topicStreamConfig,
+            topicName,
+            
Integer.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
+            
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)),
+            
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)));

Review Comment:
   ```suggestion
               
Integer.parseInt(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
               
Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)),
               
Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)));
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.periodictask.RealtimeOffsetAutoResetHandler;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<RealtimeOffsetAutoResetManager.Context> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
+  private final Map<String, Set<String>> _tableTopicsUnderBackfill;
+  private final Map<String, Set<String>> _tableEphemeralTopics;
+
+  public RealtimeOffsetAutoResetManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ControllerMetrics controllerMetrics) {
+    super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetFrequencyInSeconds(),
+        config.getRealtimeOffsetAutoResetInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableToHandler = new ConcurrentHashMap<>();
+    _tableTopicsUnderBackfill = new ConcurrentHashMap<>();
+    _tableEphemeralTopics = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected RealtimeOffsetAutoResetManager.Context preprocess(Properties 
periodicTaskProperties) {
+    RealtimeOffsetAutoResetManager.Context context = new 
RealtimeOffsetAutoResetManager.Context();
+    // Fill offset backfill job required info
+    // Examples of properties:
+    //   resetOffsetTopicName=topicName
+    //   resetOffsetTopicPartition=0
+    //   resetOffsetFrom=0
+    //   resetOffsetTo=1000
+    if 
(periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0]))
 {
+      context._shouldTriggerBackfillJobs = true;
+      for (String key : context._backfillJobPropertyKeys) {
+        context._backfillJobProperties.put(key, 
periodicTaskProperties.getProperty(key));
+      }
+    }
+    return context;
+  }
+
+  @VisibleForTesting
+  protected RealtimeOffsetAutoResetHandler getTableHandler(String 
tableNameWithType) {
+    return _tableToHandler.get(tableNameWithType);
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType, 
RealtimeOffsetAutoResetManager.Context context) {
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return;
+    }
+    LOGGER.info("Processing offset auto reset backfill for table {}, with 
context {}", tableNameWithType, context);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.error("Failed to find table config for table: {}, skipping auto 
reset periodic job", tableNameWithType);
+      return;
+    }
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    if (handler == null) {
+      return;
+    }
+
+    if (context._shouldTriggerBackfillJobs) {
+      _tableTopicsUnderBackfill.putIfAbsent(tableNameWithType, 
ConcurrentHashMap.newKeySet());
+      String topicName = 
context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_NAME);
+      _tableTopicsUnderBackfill.get(tableNameWithType).add(topicName);
+
+      StreamConfig topicStreamConfig = 
IngestionConfigUtils.getStreamConfigs(tableConfig).stream()
+          .filter(config -> topicName.equals(config.getTopicName()))
+          .findFirst().orElseThrow(() -> new RuntimeException("No matching 
topic found"));
+      LOGGER.info("Trigger backfill jobs with StreamConfig {}, topicName {}, 
properties {}",
+          topicStreamConfig, topicName, context._backfillJobProperties);
+      try {
+        
_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType,

Review Comment:
   You should also check the return value of `triggerBackfillJob()`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.periodictask.RealtimeOffsetAutoResetHandler;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<RealtimeOffsetAutoResetManager.Context> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
+  private final Map<String, Set<String>> _tableTopicsUnderBackfill;
+  private final Map<String, Set<String>> _tableEphemeralTopics;
+
+  public RealtimeOffsetAutoResetManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ControllerMetrics controllerMetrics) {
+    super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetFrequencyInSeconds(),
+        config.getRealtimeOffsetAutoResetInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableToHandler = new ConcurrentHashMap<>();
+    _tableTopicsUnderBackfill = new ConcurrentHashMap<>();
+    _tableEphemeralTopics = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected RealtimeOffsetAutoResetManager.Context preprocess(Properties 
periodicTaskProperties) {
+    RealtimeOffsetAutoResetManager.Context context = new 
RealtimeOffsetAutoResetManager.Context();
+    // Fill offset backfill job required info
+    // Examples of properties:
+    //   resetOffsetTopicName=topicName
+    //   resetOffsetTopicPartition=0
+    //   resetOffsetFrom=0
+    //   resetOffsetTo=1000
+    if 
(periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0]))
 {

Review Comment:
   Are you trying to do 
`periodicTaskProperties.containsKey(Constants.RESET_OFFSET_TOPIC_NAME)`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.periodictask.RealtimeOffsetAutoResetHandler;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<RealtimeOffsetAutoResetManager.Context> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
+  private final Map<String, Set<String>> _tableTopicsUnderBackfill;
+  private final Map<String, Set<String>> _tableEphemeralTopics;
+
+  public RealtimeOffsetAutoResetManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ControllerMetrics controllerMetrics) {
+    super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetFrequencyInSeconds(),
+        config.getRealtimeOffsetAutoResetInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableToHandler = new ConcurrentHashMap<>();
+    _tableTopicsUnderBackfill = new ConcurrentHashMap<>();
+    _tableEphemeralTopics = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected RealtimeOffsetAutoResetManager.Context preprocess(Properties 
periodicTaskProperties) {
+    RealtimeOffsetAutoResetManager.Context context = new 
RealtimeOffsetAutoResetManager.Context();
+    // Fill offset backfill job required info
+    // Examples of properties:
+    //   resetOffsetTopicName=topicName
+    //   resetOffsetTopicPartition=0
+    //   resetOffsetFrom=0
+    //   resetOffsetTo=1000
+    if 
(periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0]))
 {
+      context._shouldTriggerBackfillJobs = true;
+      for (String key : context._backfillJobPropertyKeys) {
+        context._backfillJobProperties.put(key, 
periodicTaskProperties.getProperty(key));
+      }
+    }
+    return context;
+  }
+
+  @VisibleForTesting
+  protected RealtimeOffsetAutoResetHandler getTableHandler(String 
tableNameWithType) {
+    return _tableToHandler.get(tableNameWithType);
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType, 
RealtimeOffsetAutoResetManager.Context context) {
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return;
+    }
+    LOGGER.info("Processing offset auto reset backfill for table {}, with 
context {}", tableNameWithType, context);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.error("Failed to find table config for table: {}, skipping auto 
reset periodic job", tableNameWithType);
+      return;
+    }
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    if (handler == null) {
+      return;
+    }
+
+    if (context._shouldTriggerBackfillJobs) {
+      _tableTopicsUnderBackfill.putIfAbsent(tableNameWithType, 
ConcurrentHashMap.newKeySet());
+      String topicName = 
context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_NAME);
+      _tableTopicsUnderBackfill.get(tableNameWithType).add(topicName);
+
+      StreamConfig topicStreamConfig = 
IngestionConfigUtils.getStreamConfigs(tableConfig).stream()
+          .filter(config -> topicName.equals(config.getTopicName()))
+          .findFirst().orElseThrow(() -> new RuntimeException("No matching 
topic found"));
+      LOGGER.info("Trigger backfill jobs with StreamConfig {}, topicName {}, 
properties {}",
+          topicStreamConfig, topicName, context._backfillJobProperties);
+      try {
+        
_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType,
+            topicStreamConfig,
+            topicName,
+            
Integer.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
+            
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)),
+            
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)));
+      } catch (NumberFormatException e) {
+        LOGGER.error("Invalid backfill job properties for table: {}, 
properties: {}, error: {}",
+            tableNameWithType, context._backfillJobProperties, e.getMessage(), 
e);
+      }
+    }
+
+    ensureBackfillJobsRunning(tableNameWithType);
+    ensureCompletedBackfillJobsCleanedUp(tableConfig);
+  }
+
+  /**
+   * Get the list of tables & topics being backfilled and ensure the backfill 
jobs are running.
+   */
+  private void ensureBackfillJobsRunning(String tableNameWithType) {
+    // Recover state from ephemeral multi-topics ingestion
+    // TODO: refactor or add other recover methods when other backfill 
approaches are ready
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    for (StreamConfig streamConfig : 
IngestionConfigUtils.getStreamConfigs(tableConfig)) {
+      if (streamConfig.isEphemeralBackfillTopic()) {
+        _tableEphemeralTopics.putIfAbsent(tableNameWithType, 
ConcurrentHashMap.newKeySet());
+        
_tableEphemeralTopics.get(tableNameWithType).add(streamConfig.getTopicName());
+      }
+    }
+    if (!_tableTopicsUnderBackfill.containsKey(tableNameWithType)
+        || _tableTopicsUnderBackfill.get(tableNameWithType).isEmpty()) {
+      return;
+    }
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    if (handler == null) {
+      return;
+    }
+    handler.ensureBackfillJobsRunning(tableNameWithType, 
_tableTopicsUnderBackfill.get(tableNameWithType));
+  }
+
+  private void ensureCompletedBackfillJobsCleanedUp(TableConfig tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    if (!_tableEphemeralTopics.containsKey(tableNameWithType)) {
+      return;
+    }
+    LOGGER.info("Trying to clean up backfill jobs on {}", tableNameWithType);
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    Collection<String> cleanedUpTopics = handler.cleanupCompletedBackfillJobs(
+        tableNameWithType, _tableEphemeralTopics.get(tableNameWithType));
+    if 
(cleanedUpTopics.containsAll(_tableEphemeralTopics.get(tableNameWithType))) {
+      _tableTopicsUnderBackfill.remove(tableNameWithType);
+      _tableEphemeralTopics.remove(tableNameWithType);
+      if (_tableToHandler.get(tableNameWithType) != null) {
+        _tableToHandler.get(tableNameWithType).close();
+        _tableToHandler.remove(tableNameWithType);
+      }
+    } else {
+      _tableEphemeralTopics.get(tableNameWithType).removeAll(cleanedUpTopics);
+    }
+    if (cleanedUpTopics.size() > 0) {
+      LOGGER.info("Cleaned up complete backfill topics {} for table {}", 
cleanedUpTopics, tableNameWithType);
+    }
+  }
+
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      _tableTopicsUnderBackfill.remove(tableNameWithType);
+      _tableToHandler.remove(tableNameWithType);
+    }
+  }
+
+  private RealtimeOffsetAutoResetHandler getOrConstructHandler(TableConfig 
tableConfig) {
+    RealtimeOffsetAutoResetHandler handler = 
_tableToHandler.get(tableConfig.getTableName());
+    if (handler != null) {
+      return handler;
+    }
+    if (tableConfig.getIngestionConfig() == null
+        || tableConfig.getIngestionConfig().getStreamIngestionConfig() == 
null) {
+      LOGGER.debug("Table {} config is in the legacy mode, cannot do auto 
reset", tableConfig.getTableName());
+      return null;
+    }
+    String className = 
tableConfig.getIngestionConfig().getStreamIngestionConfig()
+        .getRealtimeOffsetAutoResetHandlerClass();
+    if (className == null) {
+      LOGGER.debug("RealtimeOffsetAutoResetHandlerClass is not specified for 
table {}", tableConfig.getTableName());
+      return null;
+    }
+    try {
+      Class<?> clazz = Class.forName(className);
+      if (!RealtimeOffsetAutoResetHandler.class.isAssignableFrom(clazz)) {
+        String exceptionMessage = "Custom analyzer must be a child of "
+            + RealtimeOffsetAutoResetHandler.class.getCanonicalName();
+        throw new ReflectiveOperationException(exceptionMessage);
+      }
+      handler = (RealtimeOffsetAutoResetHandler) clazz.getConstructor(
+          PinotLLCRealtimeSegmentManager.class, 
PinotHelixResourceManager.class).newInstance(
+          _llcRealtimeSegmentManager, _pinotHelixResourceManager);
+      _tableToHandler.put(tableConfig.getTableName(), handler);
+      return handler;
+    } catch (Exception e) {
+      LOGGER.error("Cannot create RealtimeOffsetAutoResetHandler", e);
+      return null;
+    }
+  }
+
+
+
+  public class Context {

Review Comment:
   Make it `static`, also remove extra leading empty lines



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.periodictask.RealtimeOffsetAutoResetHandler;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<RealtimeOffsetAutoResetManager.Context> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
+  private final Map<String, Set<String>> _tableTopicsUnderBackfill;
+  private final Map<String, Set<String>> _tableEphemeralTopics;
+
+  public RealtimeOffsetAutoResetManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ControllerMetrics controllerMetrics) {
+    super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetFrequencyInSeconds(),
+        config.getRealtimeOffsetAutoResetInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableToHandler = new ConcurrentHashMap<>();
+    _tableTopicsUnderBackfill = new ConcurrentHashMap<>();
+    _tableEphemeralTopics = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected RealtimeOffsetAutoResetManager.Context preprocess(Properties 
periodicTaskProperties) {
+    RealtimeOffsetAutoResetManager.Context context = new 
RealtimeOffsetAutoResetManager.Context();
+    // Fill offset backfill job required info
+    // Examples of properties:
+    //   resetOffsetTopicName=topicName
+    //   resetOffsetTopicPartition=0
+    //   resetOffsetFrom=0
+    //   resetOffsetTo=1000
+    if 
(periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0]))
 {
+      context._shouldTriggerBackfillJobs = true;
+      for (String key : context._backfillJobPropertyKeys) {
+        context._backfillJobProperties.put(key, 
periodicTaskProperties.getProperty(key));
+      }
+    }
+    return context;
+  }
+
+  @VisibleForTesting
+  protected RealtimeOffsetAutoResetHandler getTableHandler(String 
tableNameWithType) {
+    return _tableToHandler.get(tableNameWithType);
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType, 
RealtimeOffsetAutoResetManager.Context context) {
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return;
+    }
+    LOGGER.info("Processing offset auto reset backfill for table {}, with 
context {}", tableNameWithType, context);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.error("Failed to find table config for table: {}, skipping auto 
reset periodic job", tableNameWithType);
+      return;
+    }
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    if (handler == null) {
+      return;
+    }
+
+    if (context._shouldTriggerBackfillJobs) {
+      _tableTopicsUnderBackfill.putIfAbsent(tableNameWithType, 
ConcurrentHashMap.newKeySet());
+      String topicName = 
context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_NAME);
+      _tableTopicsUnderBackfill.get(tableNameWithType).add(topicName);
+
+      StreamConfig topicStreamConfig = 
IngestionConfigUtils.getStreamConfigs(tableConfig).stream()
+          .filter(config -> topicName.equals(config.getTopicName()))
+          .findFirst().orElseThrow(() -> new RuntimeException("No matching 
topic found"));
+      LOGGER.info("Trigger backfill jobs with StreamConfig {}, topicName {}, 
properties {}",
+          topicStreamConfig, topicName, context._backfillJobProperties);
+      try {
+        
_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType,
+            topicStreamConfig,
+            topicName,
+            
Integer.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
+            
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)),
+            
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)));
+      } catch (NumberFormatException e) {
+        LOGGER.error("Invalid backfill job properties for table: {}, 
properties: {}, error: {}",
+            tableNameWithType, context._backfillJobProperties, e.getMessage(), 
e);
+      }
+    }
+
+    ensureBackfillJobsRunning(tableNameWithType);
+    ensureCompletedBackfillJobsCleanedUp(tableConfig);
+  }
+
+  /**
+   * Get the list of tables & topics being backfilled and ensure the backfill 
jobs are running.
+   */
+  private void ensureBackfillJobsRunning(String tableNameWithType) {
+    // Recover state from ephemeral multi-topics ingestion
+    // TODO: refactor or add other recover methods when other backfill 
approaches are ready
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    for (StreamConfig streamConfig : 
IngestionConfigUtils.getStreamConfigs(tableConfig)) {
+      if (streamConfig.isEphemeralBackfillTopic()) {
+        _tableEphemeralTopics.putIfAbsent(tableNameWithType, 
ConcurrentHashMap.newKeySet());
+        
_tableEphemeralTopics.get(tableNameWithType).add(streamConfig.getTopicName());
+      }
+    }
+    if (!_tableTopicsUnderBackfill.containsKey(tableNameWithType)
+        || _tableTopicsUnderBackfill.get(tableNameWithType).isEmpty()) {
+      return;
+    }
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    if (handler == null) {
+      return;
+    }
+    handler.ensureBackfillJobsRunning(tableNameWithType, 
_tableTopicsUnderBackfill.get(tableNameWithType));
+  }
+
+  private void ensureCompletedBackfillJobsCleanedUp(TableConfig tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    if (!_tableEphemeralTopics.containsKey(tableNameWithType)) {
+      return;
+    }
+    LOGGER.info("Trying to clean up backfill jobs on {}", tableNameWithType);
+    RealtimeOffsetAutoResetHandler handler = 
getOrConstructHandler(tableConfig);
+    Collection<String> cleanedUpTopics = handler.cleanupCompletedBackfillJobs(
+        tableNameWithType, _tableEphemeralTopics.get(tableNameWithType));
+    if 
(cleanedUpTopics.containsAll(_tableEphemeralTopics.get(tableNameWithType))) {
+      _tableTopicsUnderBackfill.remove(tableNameWithType);
+      _tableEphemeralTopics.remove(tableNameWithType);
+      if (_tableToHandler.get(tableNameWithType) != null) {
+        _tableToHandler.get(tableNameWithType).close();
+        _tableToHandler.remove(tableNameWithType);
+      }
+    } else {
+      _tableEphemeralTopics.get(tableNameWithType).removeAll(cleanedUpTopics);
+    }
+    if (cleanedUpTopics.size() > 0) {
+      LOGGER.info("Cleaned up complete backfill topics {} for table {}", 
cleanedUpTopics, tableNameWithType);
+    }
+  }
+
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      _tableTopicsUnderBackfill.remove(tableNameWithType);
+      _tableToHandler.remove(tableNameWithType);
+    }
+  }
+
+  private RealtimeOffsetAutoResetHandler getOrConstructHandler(TableConfig 
tableConfig) {
+    RealtimeOffsetAutoResetHandler handler = 
_tableToHandler.get(tableConfig.getTableName());

Review Comment:
   (minor) Cache `String tableNameWithType = tableConfig.getTableName();`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.periodictask.RealtimeOffsetAutoResetHandler;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<RealtimeOffsetAutoResetManager.Context> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
+  private final Map<String, Set<String>> _tableTopicsUnderBackfill;
+  private final Map<String, Set<String>> _tableEphemeralTopics;
+
+  public RealtimeOffsetAutoResetManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ControllerMetrics controllerMetrics) {
+    super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetFrequencyInSeconds(),
+        config.getRealtimeOffsetAutoResetInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableToHandler = new ConcurrentHashMap<>();
+    _tableTopicsUnderBackfill = new ConcurrentHashMap<>();
+    _tableEphemeralTopics = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected RealtimeOffsetAutoResetManager.Context preprocess(Properties 
periodicTaskProperties) {
+    RealtimeOffsetAutoResetManager.Context context = new 
RealtimeOffsetAutoResetManager.Context();
+    // Fill offset backfill job required info
+    // Examples of properties:
+    //   resetOffsetTopicName=topicName
+    //   resetOffsetTopicPartition=0
+    //   resetOffsetFrom=0
+    //   resetOffsetTo=1000
+    if 
(periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0]))
 {

Review Comment:
   I still don't follow the logic. For periodic task, `periodicTaskProperties` 
applies to all tables. How do you use it to setup the topic info for one single 
partition?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,62 @@ public class LLCSegmentName implements 
Comparable<LLCSegmentName> {
   private final int _sequenceNumber;
   private final String _creationTime;
   private final String _segmentName;
+  private final String _topicName;
 
   public LLCSegmentName(String segmentName) {
     String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
-    Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: 
%s", segmentName);
+    // Validate the segment name format should have 4 or 5 parts:
+    // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+    // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+    Preconditions.checkArgument(
+        parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name: 
%s", segmentName);
     _tableName = parts[0];
-    _partitionGroupId = Integer.parseInt(parts[1]);
-    _sequenceNumber = Integer.parseInt(parts[2]);
-    _creationTime = parts[3];
+    if (parts.length == 4) {
+      _topicName = "";
+      _partitionGroupId = Integer.parseInt(parts[1]);
+      _sequenceNumber = Integer.parseInt(parts[2]);
+      _creationTime = parts[3];
+    } else {
+      _topicName = parts[1];
+      _partitionGroupId = Integer.parseInt(parts[2]);
+      _sequenceNumber = Integer.parseInt(parts[3]);
+      _creationTime = parts[4];
+    }
     _segmentName = segmentName;
   }
 
   public LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
+    this(tableName, "", partitionGroupId, sequenceNumber, msSinceEpoch);
+  }
+
+  public LLCSegmentName(

Review Comment:
   (CRITICAL) This could cause incompatibility across versions. New controller 
will persist segment names not parseable by the old brokers/servers



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.periodictask.RealtimeOffsetAutoResetHandler;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<RealtimeOffsetAutoResetManager.Context> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
+  private final Map<String, Set<String>> _tableTopicsUnderBackfill;
+  private final Map<String, Set<String>> _tableEphemeralTopics;
+
+  public RealtimeOffsetAutoResetManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ControllerMetrics controllerMetrics) {
+    super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetFrequencyInSeconds(),
+        config.getRealtimeOffsetAutoResetInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableToHandler = new ConcurrentHashMap<>();
+    _tableTopicsUnderBackfill = new ConcurrentHashMap<>();
+    _tableEphemeralTopics = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected RealtimeOffsetAutoResetManager.Context preprocess(Properties 
periodicTaskProperties) {
+    RealtimeOffsetAutoResetManager.Context context = new 
RealtimeOffsetAutoResetManager.Context();
+    // Fill offset backfill job required info
+    // Examples of properties:
+    //   resetOffsetTopicName=topicName
+    //   resetOffsetTopicPartition=0
+    //   resetOffsetFrom=0
+    //   resetOffsetTo=1000
+    if 
(periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0]))
 {

Review Comment:
   I think here you need to ensure all the keys exist instead, or it will throw 
NPE on line 121



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to