chia7712 commented on a change in pull request #9967: URL: https://github.com/apache/kafka/pull/9967#discussion_r564703919
########## File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala ########## @@ -21,38 +21,220 @@ import java.io._ import java.nio.file.{Files, NoSuchFileException} import java.util.Properties +import kafka.common.InconsistentBrokerMetadataException +import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole} +import kafka.server.RawMetaProperties._ import kafka.utils._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.utils.Utils -case class BrokerMetadata(brokerId: Int, - clusterId: Option[String]) { +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +object RawMetaProperties { + val ClusterIdKey = "cluster.id" + val BrokerIdKey = "broker.id" + val ControllerIdKey = "controller.id" + val VersionKey = "version" +} + +case class RawMetaProperties(props: Properties = new Properties()) { + + def clusterId: Option[String] = { + Option(props.getProperty(ClusterIdKey)) + } + + def clusterId_=(id: String): Unit = { + props.setProperty(ClusterIdKey, id) + } + + def brokerId: Option[Int] = { + intValue(BrokerIdKey) + } + + def brokerId_=(id: Int): Unit = { + props.setProperty(BrokerIdKey, id.toString) + } + + def controllerId: Option[Int] = { + intValue(ControllerIdKey) + } + + def controllerId_=(id: Int): Unit = { + props.setProperty(ControllerIdKey, id.toString) + } + + def version: Int = { + intValue(VersionKey).getOrElse(0) + } + + def version_=(ver: Int): Unit = { + props.setProperty(VersionKey, ver.toString) + } + + def requireVersion(expectedVersion: Int): Unit = { + if (version != expectedVersion) { + throw new RuntimeException(s"Expected version $expectedVersion, but got "+ + s"version $version") + } + } + + private def intValue(key: String): Option[Int] = { + try { + Option(props.getProperty(key)).map(Integer.parseInt) + } catch { + case e: Throwable => throw new RuntimeException(s"Failed to parse $key property " + + s"as an int: ${e.getMessage}") + } + } + + override def toString: String = { + "RawMetaProperties(" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map { + key => key + "=" + props.get(key) + }.mkString(", ") + ")" + } +} + +object MetaProperties { + def parse( + properties: RawMetaProperties, + processRoles: Set[ProcessRole] + ): MetaProperties = { + properties.requireVersion(expectedVersion = 1) + val clusterId = requireClusterId(properties) + + if (processRoles.contains(BrokerRole)) { + require(BrokerIdKey, properties.brokerId) + } + + if (processRoles.contains(ControllerRole)) { + require(ControllerIdKey, properties.controllerId) + } + + new MetaProperties(clusterId, properties.brokerId, properties.controllerId) + } + + def require[T](key: String, value: Option[T]): T = { + value.getOrElse(throw new RuntimeException(s"Failed to find required property $key.")) + } + + def requireClusterId(properties: RawMetaProperties): Uuid = { + val value = require(ClusterIdKey, properties.clusterId) + try { + Uuid.fromString(value) + } catch { + case e: Throwable => throw new RuntimeException(s"Failed to parse $ClusterIdKey property " + + s"as a UUID: ${e.getMessage}") + } + } +} + +case class ZkMetaProperties( + clusterId: String, + brokerId: Int +) { + def toProperties: Properties = { + val properties = new RawMetaProperties() + properties.version = 0 + properties.clusterId = clusterId + properties.brokerId = brokerId + properties.props + } + + override def toString: String = { + s"LegacyMetaProperties(brokerId=$brokerId, clusterId=$clusterId)" + } +} + +case class MetaProperties( + clusterId: Uuid, + brokerId: Option[Int] = None, + controllerId: Option[Int] = None +) { + def toProperties: Properties = { + val properties = new RawMetaProperties() + properties.version = 1 + properties.clusterId = clusterId.toString + brokerId.foreach(properties.brokerId = _) + controllerId.foreach(properties.controllerId = _) + properties.props + } override def toString: String = { - s"BrokerMetadata(brokerId=$brokerId, clusterId=${clusterId.map(_.toString).getOrElse("None")})" + s"MetaProperties(clusterId=$clusterId" + + s", brokerId=${brokerId.getOrElse("none")}" + + s", controllerId=${controllerId.getOrElse("none")}" + + ")" + } +} + +object BrokerMetadataCheckpoint extends Logging { + def getBrokerMetadataAndOfflineDirs( + logDirs: collection.Seq[String], + ignoreMissing: Boolean + ): (RawMetaProperties, collection.Seq[String]) = { + require(logDirs.nonEmpty, "Must have at least one log dir to read meta.properties") + + val brokerMetadataMap = mutable.HashMap[String, Properties]() + val offlineDirs = mutable.ArrayBuffer.empty[String] + + for (logDir <- logDirs) { + val brokerCheckpointFile = new File(logDir + File.separator + "meta.properties") Review comment: How about using ```new File(logDir, "meta.properties")```? ########## File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala ########## @@ -21,38 +21,220 @@ import java.io._ import java.nio.file.{Files, NoSuchFileException} import java.util.Properties +import kafka.common.InconsistentBrokerMetadataException +import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole} +import kafka.server.RawMetaProperties._ import kafka.utils._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.utils.Utils -case class BrokerMetadata(brokerId: Int, - clusterId: Option[String]) { +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +object RawMetaProperties { + val ClusterIdKey = "cluster.id" + val BrokerIdKey = "broker.id" + val ControllerIdKey = "controller.id" + val VersionKey = "version" +} + +case class RawMetaProperties(props: Properties = new Properties()) { + + def clusterId: Option[String] = { + Option(props.getProperty(ClusterIdKey)) + } + + def clusterId_=(id: String): Unit = { + props.setProperty(ClusterIdKey, id) + } + + def brokerId: Option[Int] = { + intValue(BrokerIdKey) + } + + def brokerId_=(id: Int): Unit = { + props.setProperty(BrokerIdKey, id.toString) + } + + def controllerId: Option[Int] = { + intValue(ControllerIdKey) + } + + def controllerId_=(id: Int): Unit = { + props.setProperty(ControllerIdKey, id.toString) + } + + def version: Int = { + intValue(VersionKey).getOrElse(0) + } + + def version_=(ver: Int): Unit = { + props.setProperty(VersionKey, ver.toString) + } + + def requireVersion(expectedVersion: Int): Unit = { + if (version != expectedVersion) { + throw new RuntimeException(s"Expected version $expectedVersion, but got "+ + s"version $version") + } + } + + private def intValue(key: String): Option[Int] = { + try { + Option(props.getProperty(key)).map(Integer.parseInt) + } catch { + case e: Throwable => throw new RuntimeException(s"Failed to parse $key property " + + s"as an int: ${e.getMessage}") + } + } + + override def toString: String = { + "RawMetaProperties(" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map { + key => key + "=" + props.get(key) + }.mkString(", ") + ")" + } +} + +object MetaProperties { + def parse( + properties: RawMetaProperties, + processRoles: Set[ProcessRole] + ): MetaProperties = { + properties.requireVersion(expectedVersion = 1) + val clusterId = requireClusterId(properties) + + if (processRoles.contains(BrokerRole)) { + require(BrokerIdKey, properties.brokerId) + } + + if (processRoles.contains(ControllerRole)) { + require(ControllerIdKey, properties.controllerId) + } + + new MetaProperties(clusterId, properties.brokerId, properties.controllerId) + } + + def require[T](key: String, value: Option[T]): T = { + value.getOrElse(throw new RuntimeException(s"Failed to find required property $key.")) + } + + def requireClusterId(properties: RawMetaProperties): Uuid = { + val value = require(ClusterIdKey, properties.clusterId) + try { + Uuid.fromString(value) + } catch { + case e: Throwable => throw new RuntimeException(s"Failed to parse $ClusterIdKey property " + + s"as a UUID: ${e.getMessage}") + } + } +} + +case class ZkMetaProperties( + clusterId: String, + brokerId: Int +) { + def toProperties: Properties = { + val properties = new RawMetaProperties() + properties.version = 0 + properties.clusterId = clusterId + properties.brokerId = brokerId + properties.props + } + + override def toString: String = { + s"LegacyMetaProperties(brokerId=$brokerId, clusterId=$clusterId)" + } +} + +case class MetaProperties( + clusterId: Uuid, + brokerId: Option[Int] = None, + controllerId: Option[Int] = None +) { + def toProperties: Properties = { + val properties = new RawMetaProperties() + properties.version = 1 + properties.clusterId = clusterId.toString + brokerId.foreach(properties.brokerId = _) + controllerId.foreach(properties.controllerId = _) + properties.props + } override def toString: String = { - s"BrokerMetadata(brokerId=$brokerId, clusterId=${clusterId.map(_.toString).getOrElse("None")})" + s"MetaProperties(clusterId=$clusterId" + + s", brokerId=${brokerId.getOrElse("none")}" + + s", controllerId=${controllerId.getOrElse("none")}" + + ")" + } +} + +object BrokerMetadataCheckpoint extends Logging { + def getBrokerMetadataAndOfflineDirs( + logDirs: collection.Seq[String], + ignoreMissing: Boolean + ): (RawMetaProperties, collection.Seq[String]) = { + require(logDirs.nonEmpty, "Must have at least one log dir to read meta.properties") + + val brokerMetadataMap = mutable.HashMap[String, Properties]() + val offlineDirs = mutable.ArrayBuffer.empty[String] + + for (logDir <- logDirs) { + val brokerCheckpointFile = new File(logDir + File.separator + "meta.properties") + val brokerCheckpoint = new BrokerMetadataCheckpoint(brokerCheckpointFile) + + try { + brokerCheckpoint.read() match { + case Some(properties) => brokerMetadataMap += logDir -> properties + case None => if (ignoreMissing) { + logDir -> new Properties() Review comment: unnecessary body? ########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -1495,6 +1517,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO distinctRoles } + def metadataLogDir: String = { + Option(getString(KafkaConfig.MetadataLogDirProp)) match { + case Some(dir) => dir + case None => logDirs.head Review comment: Does it cause trouble if we change the order of directories from "log.dirs" when restarting server? ########## File path: core/src/main/scala/kafka/server/KafkaRaftServer.scala ########## @@ -100,4 +104,36 @@ object KafkaRaftServer { sealed trait ProcessRole case object BrokerRole extends ProcessRole case object ControllerRole extends ProcessRole + + def loadMetaProperties(config: KafkaConfig): (MetaProperties, Seq[String]) = { + val logDirs = config.logDirs ++ Seq(config.metadataLogDir) Review comment: How about ```config.logDirs :+ config.metadataLogDir```? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org