[ https://issues.apache.org/jira/browse/KAFKA-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516054#comment-16516054 ]
ASF GitHub Bot commented on KAFKA-6805: --------------------------------------- rajinisivaram closed pull request #4898: KAFKA-6805: Enable broker configs to be stored in ZK before broker start URL: https://github.com/apache/kafka/pull/4898 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index c1b384fd1fa..6ac0a019dbc 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -24,13 +24,14 @@ import joptsimple._ import kafka.common.Config import kafka.common.InvalidConfigException import kafka.log.LogConfig -import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} -import kafka.utils.{CommandLineUtils, Exit} +import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig} +import kafka.utils.{CommandLineUtils, Exit, PasswordEncoder} import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig} import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism} import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} @@ -56,11 +57,14 @@ import scala.collection.JavaConverters._ object ConfigCommand extends Config { val DefaultScramIterations = 4096 - // Dynamic broker configs can only be updated using the new AdminClient since they may require - // password encryption currently implemented only in the broker. For consistency with older versions, - // quota-related broker configs can still be updated using ZooKeeper. ConfigCommand will be migrated - // fully to the new AdminClient later (KIP-248). - val BrokerConfigsUpdatableUsingZooKeeper = Set(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, + // Dynamic broker configs can only be updated using the new AdminClient once brokers have started + // so that configs may be fully validated. Prior to starting brokers, updates may be performed using + // ZooKeeper for bootstrapping. This allows all password configs to be stored encrypted in ZK, + // avoiding clear passwords in server.properties. For consistency with older versions, quota-related + // broker configs can still be updated using ZooKeeper at any time. ConfigCommand will be migrated + // to the new AdminClient later for these configs (KIP-248). + val BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning = Set( + DynamicConfig.Broker.LeaderReplicationThrottledRateProp, DynamicConfig.Broker.FollowerReplicationThrottledRateProp, DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp) @@ -114,9 +118,25 @@ object ConfigCommand extends Config { if (entityType == ConfigType.User) preProcessScramCredentials(configsToBeAdded) - if (entityType == ConfigType.Broker) { - require(configsToBeAdded.asScala.keySet.forall(BrokerConfigsUpdatableUsingZooKeeper.contains), - s"--bootstrap-server option must be specified to update broker configs $configsToBeAdded") + else if (entityType == ConfigType.Broker) { + // Replication quota configs may be updated using ZK at any time. Other dynamic broker configs + // may be updated using ZooKeeper only if the corresponding broker is not running. Dynamic broker + // configs at cluster-default level may be configured using ZK only if there are no brokers running. + val dynamicBrokerConfigs = configsToBeAdded.asScala.keySet.filterNot(BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning.contains) + if (dynamicBrokerConfigs.nonEmpty) { + val perBrokerConfig = entityName != ConfigEntityName.Default + val errorMessage = s"--bootstrap-server option must be specified to update broker configs $dynamicBrokerConfigs." + val info = "Broker configuraton updates using ZooKeeper are supported for bootstrapping before brokers" + + " are started to enable encrypted password configs to be stored in ZooKeeper." + if (perBrokerConfig) { + adminZkClient.parseBroker(entityName).foreach { brokerId => + require(zkClient.getBroker(brokerId).isEmpty, s"$errorMessage when broker $entityName is running. $info") + } + } else { + require(zkClient.getAllBrokersInCluster.isEmpty, s"$errorMessage for default cluster if any broker is running. $info") + } + preProcessBrokerConfigs(configsToBeAdded, perBrokerConfig) + } } // compile the final set of configs @@ -156,6 +176,49 @@ object ConfigCommand extends Config { } } + private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = { + encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp) + val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp, + throw new IllegalArgumentException("Password encoder secret not specified")) + new PasswordEncoder(new Password(encoderSecret), + None, + encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm), + encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength), + encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderIterations)) + } + + /** + * Pre-process broker configs provided to convert them to persistent format. + * Password configs are encrypted using the secret `KafkaConfig.PasswordEncoderSecretProp`. + * The secret is removed from `configsToBeAdded` and will not be persisted in ZooKeeper. + */ + private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean) { + val passwordEncoderConfigs = new Properties + passwordEncoderConfigs ++= configsToBeAdded.asScala.filterKeys(_.startsWith("password.encoder.")) + if (!passwordEncoderConfigs.isEmpty) { + info(s"Password encoder configs ${passwordEncoderConfigs.keySet} will be used for encrypting" + + " passwords, but will not be stored in ZooKeeper.") + passwordEncoderConfigs.asScala.keySet.foreach(configsToBeAdded.remove) + } + + DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig) + val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig) + if (passwordConfigs.nonEmpty) { + require(passwordEncoderConfigs.containsKey(KafkaConfig.PasswordEncoderSecretProp), + s"${KafkaConfig.PasswordEncoderSecretProp} must be specified to update $passwordConfigs." + + " Other password encoder configs like cipher algorithm and iterations may also be specified" + + " to override the default encoding parameters. Password encoder configs will not be persisted" + + " in ZooKeeper." + ) + + val passwordEncoder = createPasswordEncoder(passwordEncoderConfigs.asScala) + passwordConfigs.foreach { configName => + val encodedValue = passwordEncoder.encode(new Password(configsToBeAdded.getProperty(configName))) + configsToBeAdded.setProperty(configName, encodedValue) + } + } + } + private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) { val configEntity = parseEntity(opts) val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined @@ -358,7 +421,12 @@ object ConfigCommand extends Config { parseQuotaEntity(opts) else { // Exactly one entity type and at-most one entity name expected for other entities - val name = if (opts.options.has(opts.entityName)) Some(opts.options.valueOf(opts.entityName)) else None + val name = if (opts.options.has(opts.entityName)) + Some(opts.options.valueOf(opts.entityName)) + else if (entityTypes.head == ConfigType.Broker && opts.options.has(opts.entityDefault)) + Some(ConfigEntityName.Default) + else + None ConfigEntity(Entity(entityTypes.head, name), None) } } diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 4225cdb1a40..72772fa6fcb 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -115,6 +115,43 @@ object DynamicBrokerConfig { } } + def validateConfigs(props: Properties, perBrokerConfig: Boolean): Unit = { + def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = { + if (invalidPropNames.nonEmpty) + throw new ConfigException(s"$errorMessage: $invalidPropNames") + } + checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically") + checkInvalidProps(securityConfigsWithoutListenerPrefix(props), + "These security configs can be dynamically updated only per-listener using the listener prefix") + validateConfigTypes(props) + if (!perBrokerConfig) { + checkInvalidProps(perBrokerConfigs(props), + "Cannot update these configs at default cluster level, broker id must be specified") + } + } + + private def perBrokerConfigs(props: Properties): Set[String] = { + val configNames = props.asScala.keySet + configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty) + } + + private def nonDynamicConfigs(props: Properties): Set[String] = { + props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps) + } + + private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = { + DynamicSecurityConfigs.filter(props.containsKey) + } + + private def validateConfigTypes(props: Properties): Unit = { + val baseProps = new Properties + props.asScala.foreach { + case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v) + case (k, v) => baseProps.put(k, v) + } + DynamicConfig.Broker.validate(baseProps) + } + private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = { KafkaConfig.configKeys.filterKeys(AllDynamicConfigs.contains).values.foreach { config => configDef.define(config.name, config.`type`, config.defaultValue, config.validator, @@ -298,57 +335,26 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging decoded.foreach { value => props.put(configName, passwordEncoder.encode(new Password(value))) } } } - adminZkClient.changeBrokerConfig(Seq(kafkaConfig.brokerId), props) + adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props) } } props } private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) { - def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = { - if (invalidPropNames.nonEmpty) - throw new ConfigException(s"$errorMessage: $invalidPropNames") - } - checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically") - checkInvalidProps(securityConfigsWithoutListenerPrefix(props), - "These security configs can be dynamically updated only per-listener using the listener prefix") - validateConfigTypes(props) + validateConfigs(props, perBrokerConfig) val newProps = mutable.Map[String, String]() newProps ++= staticBrokerConfigs if (perBrokerConfig) { overrideProps(newProps, dynamicDefaultConfigs) overrideProps(newProps, props.asScala) } else { - checkInvalidProps(perBrokerConfigs(props), - "Cannot update these configs at default cluster level, broker id must be specified") overrideProps(newProps, props.asScala) overrideProps(newProps, dynamicBrokerConfigs) } processReconfiguration(newProps, validateOnly = true) } - private def perBrokerConfigs(props: Properties): Set[String] = { - val configNames = props.asScala.keySet - configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty) - } - - private def nonDynamicConfigs(props: Properties): Set[String] = { - props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps) - } - - private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = { - DynamicSecurityConfigs.filter(props.containsKey) - } - - private def validateConfigTypes(props: Properties): Unit = { - val baseProps = new Properties - props.asScala.foreach { - case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v) - case (k, v) => baseProps.put(k, v) - } - DynamicConfig.Broker.validate(baseProps) - } - private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Unit = { try { validateConfigTypes(props) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 2f8da360c9b..8a6b3ee212d 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -265,6 +265,18 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { } } + def parseBroker(broker: String): Option[Int] = { + broker match { + case ConfigEntityName.Default => None + case _ => + try Some(broker.toInt) + catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value") + } + } + } + /** * Change the configs for a given entityType and entityName * @param entityType @@ -273,19 +285,11 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { */ def changeConfigs(entityType: String, entityName: String, configs: Properties): Unit = { - def parseBroker(broker: String): Int = { - try broker.toInt - catch { - case _: NumberFormatException => - throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value") - } - } - entityType match { case ConfigType.Topic => changeTopicConfig(entityName, configs) case ConfigType.Client => changeClientIdConfig(entityName, configs) case ConfigType.User => changeUserOrUserClientIdConfig(entityName, configs) - case ConfigType.Broker => changeBrokerConfig(Seq(parseBroker(entityName)), configs) + case ConfigType.Broker => changeBrokerConfig(parseBroker(entityName), configs) case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}") } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 69ca31703ef..52ad2b9fe8e 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -126,6 +126,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal)) val kafkaConfig = KafkaConfig.fromProps(props) + configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1) servers += TestUtils.createServer(kafkaConfig) } @@ -778,21 +779,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString) val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties] val config = server.config - val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured")) val oldSecret = "old-dynamic-config-secret" config.dynamicConfig.staticBrokerConfigs.put(KafkaConfig.PasswordEncoderOldSecretProp, oldSecret) val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.isPasswordConfig) assertTrue("Password configs not found", passwordConfigs.nonEmpty) - val passwordDecoder = new PasswordEncoder(secret, - config.passwordEncoderKeyFactoryAlgorithm, - config.passwordEncoderCipherAlgorithm, - config.passwordEncoderKeyLength, - config.passwordEncoderIterations) - val passwordEncoder = new PasswordEncoder(new Password(oldSecret), - config.passwordEncoderKeyFactoryAlgorithm, - config.passwordEncoderCipherAlgorithm, - config.passwordEncoderKeyLength, - config.passwordEncoderIterations) + val passwordDecoder = createPasswordEncoder(config, config.passwordEncoderSecret) + val passwordEncoder = createPasswordEncoder(config, Some(new Password(oldSecret))) passwordConfigs.foreach { case (name, value) => val decoded = passwordDecoder.decode(value).value propsEncodedWithOldSecret.put(name, passwordEncoder.encode(new Password(decoded))) @@ -1161,12 +1153,39 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private def listenerPrefix(name: String): String = new ListenerName(name).configPrefix - private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int], sslProperties: Properties): Unit = { + private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, sslProperties: Properties): Unit = { + val externalListenerPrefix = listenerPrefix(SecureExternal) val sslStoreProps = new Properties - sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, listenerPrefix(SecureExternal)) - val persistentProps = kafkaConfig.dynamicConfig.toPersistentProps(sslStoreProps, perBrokerConfig = true) + sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, externalListenerPrefix) + sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull) zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) - adminZkClient.changeBrokerConfig(brokers, persistentProps) + + val args = Array("--zookeeper", kafkaConfig.zkConnect, + "--alter", "--add-config", sslStoreProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","), + "--entity-type", "brokers", + "--entity-name", kafkaConfig.brokerId.toString) + ConfigCommand.main(args) + + val passwordEncoder = createPasswordEncoder(kafkaConfig, kafkaConfig.passwordEncoderSecret) + val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString) + assertEquals(4, brokerProps.size) + assertEquals(sslProperties.get(SSL_KEYSTORE_TYPE_CONFIG), + brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_TYPE_CONFIG")) + assertEquals(sslProperties.get(SSL_KEYSTORE_LOCATION_CONFIG), + brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_LOCATION_CONFIG")) + assertEquals(sslProperties.get(SSL_KEYSTORE_PASSWORD_CONFIG), + passwordEncoder.decode(brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_PASSWORD_CONFIG"))) + assertEquals(sslProperties.get(SSL_KEY_PASSWORD_CONFIG), + passwordEncoder.decode(brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEY_PASSWORD_CONFIG"))) + } + + private def createPasswordEncoder(config: KafkaConfig, secret: Option[Password]): PasswordEncoder = { + val encoderSecret = secret.getOrElse(throw new IllegalStateException("Password encoder secret not configured")) + new PasswordEncoder(encoderSecret, + config.passwordEncoderKeyFactoryAlgorithm, + config.passwordEncoderCipherAlgorithm, + config.passwordEncoderKeyLength, + config.passwordEncoderIterations) } private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = { diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index a24800f3cea..2644dcce6bf 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -20,21 +20,25 @@ import java.util import java.util.Properties import kafka.admin.ConfigCommand.ConfigCommandOptions +import kafka.api.ApiVersion +import kafka.cluster.{Broker, EndPoint} import kafka.common.InvalidConfigException -import kafka.server.ConfigEntityName +import kafka.server.{ConfigEntityName, KafkaConfig} import kafka.utils.{Exit, Logging} -import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness} +import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness} import org.apache.kafka.clients.admin._ -import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.{ConfigException, ConfigResource} import org.apache.kafka.common.internals.KafkaFutureImpl import org.apache.kafka.common.Node +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils import org.apache.kafka.common.utils.Sanitizer import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test -import scala.collection.mutable +import scala.collection.{Seq, mutable} import scala.collection.JavaConverters._ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { @@ -51,7 +55,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { "--entity-name", "1", "--entity-type", "brokers", "--alter", - "--add-config", "message.max.size=100000")) + "--add-config", "security.inter.broker.protocol=PLAINTEXT")) } @Test @@ -306,14 +310,99 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient)) } - @Test (expected = classOf[IllegalArgumentException]) - def shouldNotUpdateDynamicBrokerConfigUsingZooKeeper(): Unit = { - val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entity-name", "1", - "--entity-type", "brokers", - "--alter", - "--add-config", "message.max.size=100000")) - ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient)) + @Test + def testDynamicBrokerConfigUpdateUsingZooKeeper(): Unit = { + val brokerId = "1" + val adminZkClient = new AdminZkClient(zkClient) + val alterOpts = Array("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter") + + def entityOpt(brokerId: Option[String]): Array[String] = { + brokerId.map(id => Array("--entity-name", id)).getOrElse(Array("--entity-default")) + } + + def alterConfig(configs: Map[String, String], brokerId: Option[String], + encoderConfigs: Map[String, String] = Map.empty): Unit = { + val configStr = (configs ++ encoderConfigs).map { case (k, v) => s"$k=$v" }.mkString(",") + val addOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId) ++ Array("--add-config", configStr)) + ConfigCommand.alterConfig(zkClient, addOpts, adminZkClient) + } + + def verifyConfig(configs: Map[String, String], brokerId: Option[String]): Unit = { + val entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.getOrElse(ConfigEntityName.Default)) + assertEquals(configs, entityConfigs.asScala) + } + + def alterAndVerifyConfig(configs: Map[String, String], brokerId: Option[String]): Unit = { + alterConfig(configs, brokerId) + verifyConfig(configs, brokerId) + } + + def deleteAndVerifyConfig(configNames: Set[String], brokerId: Option[String]): Unit = { + val deleteOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId) ++ + Array("--delete-config", configNames.mkString(","))) + ConfigCommand.alterConfig(zkClient, deleteOpts, adminZkClient) + verifyConfig(Map.empty, brokerId) + } + + // Add config + alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId)) + alterAndVerifyConfig(Map("message.max.size" -> "120000"), None) + + // Change config + alterAndVerifyConfig(Map("message.max.size" -> "130000"), Some(brokerId)) + alterAndVerifyConfig(Map("message.max.size" -> "140000"), None) + + // Delete config + deleteAndVerifyConfig(Set("message.max.size"), Some(brokerId)) + deleteAndVerifyConfig(Set("message.max.size"), None) + + // Listener configs: should work only with listener name + alterAndVerifyConfig(Map("listener.name.external.ssl.keystore.location" -> "/tmp/test.jks"), Some(brokerId)) + intercept[ConfigException](alterConfig(Map("ssl.keystore.location" -> "/tmp/test.jks"), Some(brokerId))) + + // Per-broker config configured at default cluster-level should fail + intercept[ConfigException](alterConfig(Map("listener.name.external.ssl.keystore.location" -> "/tmp/test.jks"), None)) + deleteAndVerifyConfig(Set("listener.name.external.ssl.keystore.location"), Some(brokerId)) + + // Password config update without encoder secret should fail + intercept[IllegalArgumentException](alterConfig(Map("listener.name.external.ssl.keystore.password" -> "secret"), Some(brokerId))) + + // Password config update with encoder secret should succeed and encoded password must be stored in ZK + val configs = Map("listener.name.external.ssl.keystore.password" -> "secret", "log.cleaner.threads" -> "2") + val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret") + alterConfig(configs, Some(brokerId), encoderConfigs) + val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId) + assertFalse("Encoder secret stored in ZooKeeper", brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp)) + assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not encoded + val encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password") + val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs) + assertEquals("secret", passwordEncoder.decode(encodedPassword).value) + assertEquals(configs.size, brokerConfigs.size) + + // Password config update with overrides for encoder parameters + val configs2 = Map("listener.name.internal.ssl.keystore.password" -> "secret2") + val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret", + KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding", + KafkaConfig.PasswordEncoderIterationsProp -> "1024", + KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> "PBKDF2WithHmacSHA1", + KafkaConfig.PasswordEncoderKeyLengthProp -> "64") + alterConfig(configs2, Some(brokerId), encoderConfigs2) + val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId) + val encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password") + assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs).decode(encodedPassword2).value) + assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs2).decode(encodedPassword2).value) + + + // Password config update at default cluster-level should fail + intercept[ConfigException](alterConfig(configs, None, encoderConfigs)) + + // Dynamic config updates using ZK should fail if broker is running. + registerBrokerInZk(brokerId.toInt) + intercept[IllegalArgumentException](alterConfig(Map("message.max.size" -> "210000"), Some(brokerId))) + intercept[IllegalArgumentException](alterConfig(Map("message.max.size" -> "220000"), None)) + + // Dynamic config updates using ZK should for a different broker that is not running should succeed + alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2")) } @Test (expected = classOf[IllegalArgumentException]) @@ -322,7 +411,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { "--entity-name", "1", "--entity-type", "brokers", "--alter", - "--add-config", "a=")) + "--add-config", "a==")) ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient)) } @@ -593,6 +682,14 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2")) } + private def registerBrokerInZk(id: Int): Unit = { + zkClient.createTopLevelPaths() + val securityProtocol = SecurityProtocol.PLAINTEXT + val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) + val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), ApiVersion.latestVersion, jmxPort = 9192) + zkClient.registerBrokerInZk(brokerInfo) + } + class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) { override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {} override def fetchEntityConfig(entityType: String, entityName: String): Properties = {new Properties} diff --git a/docs/configuration.html b/docs/configuration.html index 8c86534fb15..90c990bdaad 100644 --- a/docs/configuration.html +++ b/docs/configuration.html @@ -90,6 +90,23 @@ <h5>Updating Password Configs Dynamically</h5> using <code>kafka-configs.sh</code> even if the password config is not being altered. This constraint will be removed in a future release.</p> + <h5>Updating Password Configs in ZooKeeper Before Starting Brokers</h5> + + From Kafka 2.0.0 onwards, <code>kafka-configs.sh</code> enables dynamic broker configs to be updated using ZooKeeper before + starting brokers for bootstrapping. This enables all password configs to be stored in encrypted form, avoiding the need for + clear passwords in <code>server.properties</code>. The broker config <code>password.encoder.secret</code> must also be specified + if any password configs are included in the alter command. Additional encryption parameters may also be specified. Password + encoder configs will not be persisted in ZooKeeper. For example, to store SSL key password for listener <code>INTERNAL</code> + on broker 0: + + <pre class="brush: bash;"> + > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name 0 --alter --add-config + 'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192' + </pre> + + The configuration <code>listener.name.internal.ssl.key.password</code> will be persisted in ZooKeeper in encrypted + form using the provided encoder configs. The encoder secret and iterations are not persisted in ZooKeeper. + <h5>Updating SSL Keystore of an Existing Listener</h5> Brokers may be configured with SSL keystores with short validity periods to reduce the risk of compromised certificates. Keystores may be updated dynamically without restarting the broker. The config name must be prefixed with the listener prefix diff --git a/docs/upgrade.html b/docs/upgrade.html index 89c90d19d26..13498364b62 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -127,6 +127,8 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 <p>KIP-283 also adds new topic and broker configurations <code>message.downconversion.enable</code> and <code>log.message.downconversion.enable</code> respectively to control whether down-conversion is enabled. When disabled, broker does not perform any down-conversion and instead sends an <code>UNSUPPORTED_VERSION</code> error to the client.</p></li> + <li>Dynamic broker configuration options can be stored in ZooKeeper using kafka-configs.sh before brokers are started. + This option can be used to avoid storing clear passwords in server.properties as all password configs may be stored encrypted in ZooKeeper.</li> </ul> <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow dynamic broker configs to be configured in ZooKeeper before starting > broker > --------------------------------------------------------------------------------- > > Key: KAFKA-6805 > URL: https://issues.apache.org/jira/browse/KAFKA-6805 > Project: Kafka > Issue Type: Task > Components: tools > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 2.1.0 > > > At the moment, dynamic broker configs like SSL keystore and password can be > configured using ConfigCommand only after a broker is started (using the new > AdminClient). To start a broker, these configs have to be defined in > server.properties. We want to restrict updates using ZooKeeper once broker > starts up, but we should allow updates using ZK prior to starting brokers. > This is particularly useful for password configs which are stored encrypted > in ZK, making it difficult to set manually before starting brokers. > ConfigCommand is being updated to talk to AdminClient under KIP-248, but we > will need to maintain the tool using ZK to enable credentials to be created > in ZK before starting brokers. So the functionality to set broker configs can > sit alongside that. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)