junrao commented on a change in pull request #10733:
URL: https://github.com/apache/kafka/pull/10733#discussion_r649337876



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+    /**
+     * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+     * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.

Review comment:
       passed => passes

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+    /**
+     * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+     * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+     */
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
+            "implementation. For example this value can be `rsm.s3.`.";
+
+    /**
+     * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
+     * this prefix and passed to {@code RemoteLogMetadataManager} using {@link 
RemoteLogMetadataManager#configure(Map)}.
+     */
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= "remote.log.metadata.manager.impl.prefix";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
+            "implementation. For example this value can be `rlmm.s3.`.";
+
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tier storage functionality in a broker or not. Valid values " +
+            "are `true` or `false` and the default value is false. When it is 
true broker starts all the services required for tiered storage functionality.";
+    public static final boolean DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE = 
false;
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP = 
"remote.log.storage.manager.class.name";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC = "Fully 
qualified class name of `RemoteLogStorageManager` implementation.";
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP = 
"remote.log.storage.manager.class.path";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class 
path of the `RemoteLogStorageManager` implementation." +
+            "If specified, the RemoteLogStorageManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = 
"remote.log.metadata.manager.class.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = 
"Fully qualified class name of `RemoteLogMetadataManager` implementation.";
+    //todo add the default topic based RLMM class name.
+    public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME 
= "";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = 
"remote.log.metadata.manager.class.path";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = 
"Class path of the `RemoteLogMetadataManager` implementation." +
+            "If specified, the RemoteLogMetadataManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP 
= "remote.log.metadata.manager.listener.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
+            "needed by RemoteLogMetadataManager implementation.";
+
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = 
"remote.log.index.file.cache.total.size.bytes";
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space 
allocated to store index files fetched " +
+            "from remote storage in the local storage.";
+    public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
+
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling tasks to copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS = 30 
* 1000L;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP 
= "remote.log.manager.task.retry.backoff.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC = 
"It represents the wait time in milli seconds before the request is retried 
again.";

Review comment:
       It would be useful to document that this is the initial backoff time.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+    /**
+     * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+     * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+     */
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
+            "implementation. For example this value can be `rsm.s3.`.";
+
+    /**
+     * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
+     * this prefix and passed to {@code RemoteLogMetadataManager} using {@link 
RemoteLogMetadataManager#configure(Map)}.
+     */
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= "remote.log.metadata.manager.impl.prefix";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
+            "implementation. For example this value can be `rlmm.s3.`.";
+
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tier storage functionality in a broker or not. Valid values " +
+            "are `true` or `false` and the default value is false. When it is 
true broker starts all the services required for tiered storage functionality.";
+    public static final boolean DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE = 
false;
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP = 
"remote.log.storage.manager.class.name";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC = "Fully 
qualified class name of `RemoteLogStorageManager` implementation.";
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP = 
"remote.log.storage.manager.class.path";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class 
path of the `RemoteLogStorageManager` implementation." +
+            "If specified, the RemoteLogStorageManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = 
"remote.log.metadata.manager.class.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = 
"Fully qualified class name of `RemoteLogMetadataManager` implementation.";
+    //todo add the default topic based RLMM class name.
+    public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME 
= "";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = 
"remote.log.metadata.manager.class.path";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = 
"Class path of the `RemoteLogMetadataManager` implementation." +
+            "If specified, the RemoteLogMetadataManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP 
= "remote.log.metadata.manager.listener.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
+            "needed by RemoteLogMetadataManager implementation.";
+
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = 
"remote.log.index.file.cache.total.size.bytes";
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space 
allocated to store index files fetched " +
+            "from remote storage in the local storage.";
+    public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
+
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling tasks to copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS = 30 
* 1000L;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP 
= "remote.log.manager.task.retry.backoff.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC = 
"It represents the wait time in milli seconds before the request is retried 
again.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS 
= 30 * 1000L;

Review comment:
       Why is the default the same as max?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+    /**
+     * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+     * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+     */
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
+            "implementation. For example this value can be `rsm.s3.`.";
+
+    /**
+     * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
+     * this prefix and passed to {@code RemoteLogMetadataManager} using {@link 
RemoteLogMetadataManager#configure(Map)}.
+     */
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= "remote.log.metadata.manager.impl.prefix";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
+            "implementation. For example this value can be `rlmm.s3.`.";
+
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tier storage functionality in a broker or not. Valid values " +
+            "are `true` or `false` and the default value is false. When it is 
true broker starts all the services required for tiered storage functionality.";
+    public static final boolean DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE = 
false;
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP = 
"remote.log.storage.manager.class.name";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC = "Fully 
qualified class name of `RemoteLogStorageManager` implementation.";
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP = 
"remote.log.storage.manager.class.path";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class 
path of the `RemoteLogStorageManager` implementation." +
+            "If specified, the RemoteLogStorageManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = 
"remote.log.metadata.manager.class.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = 
"Fully qualified class name of `RemoteLogMetadataManager` implementation.";
+    //todo add the default topic based RLMM class name.
+    public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME 
= "";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = 
"remote.log.metadata.manager.class.path";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = 
"Class path of the `RemoteLogMetadataManager` implementation." +
+            "If specified, the RemoteLogMetadataManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP 
= "remote.log.metadata.manager.listener.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
+            "needed by RemoteLogMetadataManager implementation.";
+
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = 
"remote.log.index.file.cache.total.size.bytes";
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space 
allocated to store index files fetched " +
+            "from remote storage in the local storage.";
+    public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
+
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling tasks to copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";

Review comment:
       "fetch remote log indexes" should be driven by the consumer, not by the 
interval configuration, right?

##########
File path: core/src/test/scala/unit/kafka/log/LogConfigTest.scala
##########
@@ -201,4 +206,72 @@ class LogConfigTest {
     })
   }
 
+  @Test
+  def testLocalLogRetentionDerivedProps(): Unit = {
+    val props = new Properties()
+    val retentionBytes = 1024
+    val retentionMs = 1000L
+    props.put(LogConfig.RetentionBytesProp, retentionBytes)
+    props.put(LogConfig.RetentionMsProp, retentionMs)
+    val logConfig = new LogConfig(props)
+
+    assertEquals(retentionMs, logConfig.localRetentionMs)
+    assertEquals(retentionBytes, logConfig.localRetentionBytes)
+  }
+
+  @Test
+  def testLocalLogRetentionDerivedDefaultProps(): Unit = {
+    val logConfig = new LogConfig( new Properties())
+
+    // Local retention defaults are derived from retention properties which 
can be default or custom.
+    assertEquals(Defaults.RetentionMs, logConfig.localRetentionMs)
+    assertEquals(Defaults.RetentionSize, logConfig.localRetentionBytes)
+  }
+
+  @Test
+  def testLocalLogRetentionProps(): Unit = {
+    val props = new Properties()
+    val localRetentionMs = 500
+    val localRetentionBytes = 1000
+    props.put(LogConfig.RetentionBytesProp, 2000)
+    props.put(LogConfig.RetentionMsProp, 1000)
+
+    props.put(LogConfig.LocalLogRetentionMsProp, localRetentionMs)
+    props.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes)
+    val logConfig = new LogConfig(props)
+
+    assertEquals(localRetentionMs, logConfig.localRetentionMs)
+    assertEquals(localRetentionBytes, logConfig.localRetentionBytes)
+  }
+
+  @Test
+  def testInvalidLocalLogRetentionProps(): Unit = {
+    // Check for invalid localRetentionMs, < -2
+    doTestInvalidLocalLogRetentionProps(-3, 10, 2, 500L)
+
+    // Check for invalid localRetentionBytes < -2
+    doTestInvalidLocalLogRetentionProps(500L, -3, 2, 1000L)
+
+    // Check for invalid case of localRetentionMs > retentionMs
+    doTestInvalidLocalLogRetentionProps(2000L, 2, 100, 1000L)
+
+    // Check for invalid case of localRetentionBytes > retentionBytes
+    doTestInvalidLocalLogRetentionProps(500L, 200, 100, 1000L)
+
+    // Check for invalid case of localRetentionBytes (-1 viz unlimited) > 
retentionBytes,

Review comment:
       localRetentionMs is -1 in this case.

##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -301,6 +350,16 @@ object LogConfig {
         FollowerReplicationThrottledReplicasDoc, 
FollowerReplicationThrottledReplicasProp)
       .define(MessageDownConversionEnableProp, BOOLEAN, 
Defaults.MessageDownConversionEnable, LOW,
         MessageDownConversionEnableDoc, 
KafkaConfig.LogMessageDownConversionEnableProp)
+
+    // RemoteLogStorageEnableProp, LocalLogRetentionMsProp, 
LocalLogRetentionBytesProp do not have server default
+    // config names.

Review comment:
       Hmm, why is that? It's inconvenient to have to set these for each topic.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -827,6 +828,25 @@ class KafkaConfigTest {
         case RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
 
+        // Remote Log Manager Configs
+        case RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+        case RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP => 
// ignore string
+        case RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP => 
// ignore string
+        case 
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP => // ignore 
string
+        case 
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP => // ignore 
string
+        case 
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // 
ignore string
+        case 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+
+//        case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
+//        case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)

Review comment:
       Should we uncomment these?

##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RemoteLogManagerConfigTest {
+
+    private static class TestConfig extends AbstractConfig {
+        public TestConfig(Map<?, ?> originals) {
+            super(RemoteLogManagerConfig.CONFIG_DEF, originals, true);
+        }
+    }
+
+    @Test
+    public void testValidConfigs() {
+        String rsmPrefix = "__custom.rsm.";
+        String rlmmPrefix = "__custom.rlmm.";
+        Map<String, Object> rsmProps = Collections.singletonMap("rsm.prop", 
"val");
+        Map<String, Object> rlmmProps = Collections.singletonMap("rlmm.prop", 
"val");
+        RemoteLogManagerConfig expectedRemoteLogManagerConfig
+                = new RemoteLogManagerConfig(true, 
"dummy.remote.storage.class", "dummy.remote.storage.class.path",
+                                             
"dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path",
+                                             "listener.name", 1024 * 1024L, 1, 
60000L, 100L, 60000L, 1000L, 10, 100, rsmPrefix,

Review comment:
       line too long

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+    /**
+     * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+     * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+     */
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
+            "implementation. For example this value can be `rsm.s3.`.";
+
+    /**
+     * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
+     * this prefix and passed to {@code RemoteLogMetadataManager} using {@link 
RemoteLogMetadataManager#configure(Map)}.
+     */
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= "remote.log.metadata.manager.impl.prefix";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
+            "implementation. For example this value can be `rlmm.s3.`.";
+
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tier storage functionality in a broker or not. Valid values " +
+            "are `true` or `false` and the default value is false. When it is 
true broker starts all the services required for tiered storage functionality.";
+    public static final boolean DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE = 
false;
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP = 
"remote.log.storage.manager.class.name";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC = "Fully 
qualified class name of `RemoteLogStorageManager` implementation.";
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP = 
"remote.log.storage.manager.class.path";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class 
path of the `RemoteLogStorageManager` implementation." +
+            "If specified, the RemoteLogStorageManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = 
"remote.log.metadata.manager.class.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = 
"Fully qualified class name of `RemoteLogMetadataManager` implementation.";
+    //todo add the default topic based RLMM class name.
+    public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME 
= "";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = 
"remote.log.metadata.manager.class.path";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = 
"Class path of the `RemoteLogMetadataManager` implementation." +
+            "If specified, the RemoteLogMetadataManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP 
= "remote.log.metadata.manager.listener.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
+            "needed by RemoteLogMetadataManager implementation.";
+
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = 
"remote.log.index.file.cache.total.size.bytes";
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space 
allocated to store index files fetched " +
+            "from remote storage in the local storage.";
+    public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
+
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling tasks to copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS = 30 
* 1000L;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP 
= "remote.log.manager.task.retry.backoff.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC = 
"It represents the wait time in milli seconds before the request is retried 
again.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS 
= 30 * 1000L;
+
+    public static final String 
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP = 
"remote.log.manager.task.retry.backoff.max.ms";
+    public static final String 
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC = "The maximum amount of time 
in milliseconds to wait when the request " +
+            "is retried again. The retry duration will increase exponentially 
for each request failure up to this maximum wait interval.";
+    public static final long 
DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS = 30 * 1000L;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_MS_PROP = 
"remote.log.manager.task.retry.jitter";
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_MS_DOC = 
"The maximum random jitter subtracted from the scheduled " +
+            " wait time to avoid thundering herds of requests.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_MS = 
30 * 1000L;
+
+    public static final String REMOTE_LOG_READER_THREADS_PROP = 
"remote.log.reader.threads";
+    public static final String REMOTE_LOG_READER_THREADS_DOC = "Size of the 
thread pool that is allocated for handling remote log reads.";
+    public static final int DEFAULT_REMOTE_LOG_READER_THREADS = 10;
+
+    public static final String REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP = 
"remote.log.reader.max.pending.tasks";
+    public static final String REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC = 
"Maximum remote log reader thread pool task queue size. If the task queue " +
+            "is full, fetch requests are served with an error.";
+    public static final int DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS = 100;
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef();
+
+    static {
+        
CONFIG_DEF.define(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
BOOLEAN,

Review comment:
       We could get rid of RemoteLogManagerConfig right? Ditto in other places.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+    /**
+     * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+     * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+     */
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
+            "implementation. For example this value can be `rsm.s3.`.";
+
+    /**
+     * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
+     * this prefix and passed to {@code RemoteLogMetadataManager} using {@link 
RemoteLogMetadataManager#configure(Map)}.
+     */
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= "remote.log.metadata.manager.impl.prefix";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
+            "implementation. For example this value can be `rlmm.s3.`.";
+
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tier storage functionality in a broker or not. Valid values " +
+            "are `true` or `false` and the default value is false. When it is 
true broker starts all the services required for tiered storage functionality.";
+    public static final boolean DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE = 
false;
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP = 
"remote.log.storage.manager.class.name";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC = "Fully 
qualified class name of `RemoteLogStorageManager` implementation.";
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP = 
"remote.log.storage.manager.class.path";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class 
path of the `RemoteLogStorageManager` implementation." +
+            "If specified, the RemoteLogStorageManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = 
"remote.log.metadata.manager.class.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = 
"Fully qualified class name of `RemoteLogMetadataManager` implementation.";
+    //todo add the default topic based RLMM class name.
+    public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME 
= "";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = 
"remote.log.metadata.manager.class.path";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = 
"Class path of the `RemoteLogMetadataManager` implementation." +
+            "If specified, the RemoteLogMetadataManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP 
= "remote.log.metadata.manager.listener.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
+            "needed by RemoteLogMetadataManager implementation.";
+
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = 
"remote.log.index.file.cache.total.size.bytes";
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space 
allocated to store index files fetched " +
+            "from remote storage in the local storage.";
+    public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
+
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling tasks to copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS = 30 
* 1000L;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP 
= "remote.log.manager.task.retry.backoff.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC = 
"It represents the wait time in milli seconds before the request is retried 
again.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS 
= 30 * 1000L;
+
+    public static final String 
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP = 
"remote.log.manager.task.retry.backoff.max.ms";
+    public static final String 
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC = "The maximum amount of time 
in milliseconds to wait when the request " +
+            "is retried again. The retry duration will increase exponentially 
for each request failure up to this maximum wait interval.";
+    public static final long 
DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS = 30 * 1000L;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_MS_PROP = 
"remote.log.manager.task.retry.jitter";
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_MS_DOC = 
"The maximum random jitter subtracted from the scheduled " +
+            " wait time to avoid thundering herds of requests.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_MS = 
30 * 1000L;
+
+    public static final String REMOTE_LOG_READER_THREADS_PROP = 
"remote.log.reader.threads";

Review comment:
       What about the copying threads? Is that separate from this?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+    /**
+     * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+     * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+     */
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
+    public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
+            "implementation. For example this value can be `rsm.s3.`.";
+
+    /**
+     * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
+     * this prefix and passed to {@code RemoteLogMetadataManager} using {@link 
RemoteLogMetadataManager#configure(Map)}.
+     */
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= "remote.log.metadata.manager.impl.prefix";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
+            "implementation. For example this value can be `rlmm.s3.`.";
+
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
+    public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tier storage functionality in a broker or not. Valid values " +
+            "are `true` or `false` and the default value is false. When it is 
true broker starts all the services required for tiered storage functionality.";
+    public static final boolean DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE = 
false;
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP = 
"remote.log.storage.manager.class.name";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC = "Fully 
qualified class name of `RemoteLogStorageManager` implementation.";
+
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP = 
"remote.log.storage.manager.class.path";
+    public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class 
path of the `RemoteLogStorageManager` implementation." +
+            "If specified, the RemoteLogStorageManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = 
"remote.log.metadata.manager.class.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = 
"Fully qualified class name of `RemoteLogMetadataManager` implementation.";
+    //todo add the default topic based RLMM class name.
+    public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME 
= "";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = 
"remote.log.metadata.manager.class.path";
+    public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = 
"Class path of the `RemoteLogMetadataManager` implementation." +
+            "If specified, the RemoteLogMetadataManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+            "classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+            "with the standard Java class path string.";
+
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP 
= "remote.log.metadata.manager.listener.name";
+    public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
+            "needed by RemoteLogMetadataManager implementation.";
+
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = 
"remote.log.index.file.cache.total.size.bytes";
+    public static final String 
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space 
allocated to store index files fetched " +
+            "from remote storage in the local storage.";
+    public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
+
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling tasks to copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
+            "segments, fetch remote log indexes and clean up remote log 
segments.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS = 30 
* 1000L;
+
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP 
= "remote.log.manager.task.retry.backoff.ms";
+    public static final String REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC = 
"It represents the wait time in milli seconds before the request is retried 
again.";
+    public static final long DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS 
= 30 * 1000L;
+
+    public static final String 
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP = 
"remote.log.manager.task.retry.backoff.max.ms";
+    public static final String 
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC = "The maximum amount of time 
in milliseconds to wait when the request " +
+            "is retried again. The retry duration will increase exponentially 
for each request failure up to this maximum wait interval.";
+    public static final long 
DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS = 30 * 1000L;

Review comment:
       Why is the jitter the same as max?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to