Hi, I decided to fix KAFKA-1667.

Currently I have an initial patch, which seems to work. I would like
to know, whether overall code is ok. Also there are few TODOs in the
code

1. I haven't added documentation to the properties, as ConfigDef
suggests. Should I?
2. I'm not sure what Importance should be assigned to properties. It
is NORMAL for all properties. Where can I find some info on this?
3. Not totally sure, that validations are correct. Tried to figure
that out from the code, still might miss something.

Finally is this mailing list is the right place to ask such questions
or should I submit patch to Jira ticket and get a review there even if
I'm not sure about its quality?

Thanks for the help.

The patch itself:

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index c4cea2c..347e252 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -19,6 +19,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;

 /**
  * This class is used for specifying the set of expected
configurations, their type, their defaults, their
@@ -49,6 +50,14 @@ public class ConfigDef {
     private final Map<String, ConfigKey> configKeys = new
HashMap<String, ConfigKey>();

     /**
+     * Returns unmodifiable set of properties names defined in this
{@linkplain ConfigDef}
+     * @return new unmodifiable {@link Set} instance containing the keys
+     */
+    public Set<String> names() {
+        return Collections.unmodifiableSet(configKeys.keySet());
+    }
+
+    /**
      * Define a new configuration
      * @param name The name of the config parameter
      * @param type The type of the config
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 0b2735e..285c033 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -256,7 +256,7 @@ object TopicCommand {
                          .ofType(classOf[String])
     val nl = System.getProperty("line.separator")
     val configOpt = parser.accepts("config", "A topic configuration
override for the topic being created or altered."  +
-                                                         "The
following is a list of valid configurations: " + nl +
LogConfig.ConfigNames.map("\t" + _).mkString(nl) + nl +
+                                                         "The
following is a list of valid configurations: " + nl +
LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
                                                          "See the
Kafka documentation for full details on the topic configs.")
                           .withRequiredArg
                           .describedAs("name=value")
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala
b/core/src/main/scala/kafka/log/LogConfig.scala
index e48922a..3e0a986 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import org.apache.kafka.common.utils.Utils

 import scala.collection._
-import kafka.common._
+import org.apache.kafka.common.config.ConfigDef

 object Defaults {
   val SegmentSize = 1024 * 1024
@@ -106,6 +106,10 @@ case class LogConfig(val segmentSize: Int =
Defaults.SegmentSize,
 }

 object LogConfig {
+
+  val Delete = "delete"
+  val Compact = "compact"
+
   val SegmentBytesProp = "segment.bytes"
   val SegmentMsProp = "segment.ms"
   val SegmentJitterMsProp = "segment.jitter.ms"
@@ -123,46 +127,61 @@ object LogConfig {
   val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"

-  val ConfigNames = Set(SegmentBytesProp,
-                        SegmentMsProp,
-                        SegmentJitterMsProp,
-                        SegmentIndexBytesProp,
-                        FlushMessagesProp,
-                        FlushMsProp,
-                        RetentionBytesProp,
-                        RententionMsProp,
-                        MaxMessageBytesProp,
-                        IndexIntervalBytesProp,
-                        FileDeleteDelayMsProp,
-                        DeleteRetentionMsProp,
-                        MinCleanableDirtyRatioProp,
-                        CleanupPolicyProp,
-                        UncleanLeaderElectionEnableProp,
-                        MinInSyncReplicasProp)
+  private val configDef = {
+    import ConfigDef.Range._
+    import ConfigDef.ValidString._
+    import ConfigDef.Type._
+    import ConfigDef.Importance._
+    import java.util.Arrays.asList
+
+    // TODO clarify importance
+    // TODO clarify validations
+    // TODO define documentation
+    new ConfigDef()
+    .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(0),
MEDIUM, "")
+    .define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM, "")
+    .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs,
atLeast(0), MEDIUM, "")
+    .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize,
atLeast(0), MEDIUM, "")
+    .define(FlushMessagesProp, LONG, Defaults.FlushInterval,
atLeast(0), MEDIUM, "")
+    .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, "")
+    .define(RetentionBytesProp, LONG, Defaults.RetentionSize,
atLeast(0), MEDIUM, "")
+    .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0),
MEDIUM, "")
+    .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize,
atLeast(0), MEDIUM, "")
+    .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval,
atLeast(0), MEDIUM, "")
+    .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs,
atLeast(0), MEDIUM, "")
+    .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs,
atLeast(0), MEDIUM, "")
+    .define(MinCleanableDirtyRatioProp, DOUBLE,
Defaults.MinCleanableDirtyRatio, atLeast(0), MEDIUM, "")
+    .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Delete
else Compact, in(asList(Compact, Delete)), MEDIUM, "")
+    .define(UncleanLeaderElectionEnableProp, BOOLEAN,
Defaults.UncleanLeaderElectionEnable, MEDIUM, "")
+    .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas,
atLeast(1), MEDIUM, "")
+  }
+
+  def configNames() = {
+    import JavaConversions._
+    configDef.names().toList.sorted
+  }

   /**
    * Parse the given properties instance into a LogConfig object
    */
   def fromProps(props: Properties): LogConfig = {
-    new LogConfig(segmentSize = props.getProperty(SegmentBytesProp,
Defaults.SegmentSize.toString).toInt,
-                  segmentMs = props.getProperty(SegmentMsProp,
Defaults.SegmentMs.toString).toLong,
-                  segmentJitterMs =
props.getProperty(SegmentJitterMsProp,
Defaults.SegmentJitterMs.toString).toLong,
-                  maxIndexSize =
props.getProperty(SegmentIndexBytesProp,
Defaults.MaxIndexSize.toString).toInt,
-                  flushInterval =
props.getProperty(FlushMessagesProp,
Defaults.FlushInterval.toString).toLong,
-                  flushMs = props.getProperty(FlushMsProp,
Defaults.FlushMs.toString).toLong,
-                  retentionSize =
props.getProperty(RetentionBytesProp,
Defaults.RetentionSize.toString).toLong,
-                  retentionMs = props.getProperty(RententionMsProp,
Defaults.RetentionMs.toString).toLong,
-                  maxMessageSize =
props.getProperty(MaxMessageBytesProp,
Defaults.MaxMessageSize.toString).toInt,
-                  indexInterval =
props.getProperty(IndexIntervalBytesProp,
Defaults.IndexInterval.toString).toInt,
-                  fileDeleteDelayMs =
props.getProperty(FileDeleteDelayMsProp,
Defaults.FileDeleteDelayMs.toString).toInt,
-                  deleteRetentionMs =
props.getProperty(DeleteRetentionMsProp,
Defaults.DeleteRetentionMs.toString).toLong,
-                  minCleanableRatio =
props.getProperty(MinCleanableDirtyRatioProp,
-                    Defaults.MinCleanableDirtyRatio.toString).toDouble,
-                  compact = props.getProperty(CleanupPolicyProp,
if(Defaults.Compact) "compact" else "delete")
-                    .trim.toLowerCase != "delete",
-                  uncleanLeaderElectionEnable =
props.getProperty(UncleanLeaderElectionEnableProp,
-                    Defaults.UncleanLeaderElectionEnable.toString).toBoolean,
-                  minInSyncReplicas =
props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt)
+    val parsed = configDef.parse(props)
+    new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int],
+                  segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long],
+                  segmentJitterMs =
parsed.get(SegmentJitterMsProp).asInstanceOf[Long],
+                  maxIndexSize =
parsed.get(SegmentIndexBytesProp).asInstanceOf[Int],
+                  flushInterval =
parsed.get(FlushMessagesProp).asInstanceOf[Long],
+                  flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
+                  retentionSize =
parsed.get(RetentionBytesProp).asInstanceOf[Long],
+                  retentionMs =
parsed.get(RententionMsProp).asInstanceOf[Long],
+                  maxMessageSize =
parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
+                  indexInterval =
parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
+                  fileDeleteDelayMs =
parsed.get(FileDeleteDelayMsProp).asInstanceOf[Int],
+                  deleteRetentionMs =
parsed.get(DeleteRetentionMsProp).asInstanceOf[Long],
+                  minCleanableRatio =
parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
+                  compact =
parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase !=
Delete,
+                  uncleanLeaderElectionEnable =
parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
+                  minInSyncReplicas =
parsed.get(MinInSyncReplicasProp).asInstanceOf[Int])
   }

   /**
@@ -179,30 +198,17 @@ object LogConfig {
    */
   def validateNames(props: Properties) {
     import JavaConversions._
+    val names = configDef.names()
     for(name <- props.keys)
-      require(LogConfig.ConfigNames.contains(name), "Unknown
configuration \"%s\".".format(name))
+      require(names.contains(name), "Unknown configuration
\"%s\".".format(name))
   }

   /**
-   * Check that the given properties contain only valid log config
names, and that all values can be parsed.
+   * Check that the given properties contain only valid log config
names and that all values can be parsed and are valid
    */
   def validate(props: Properties) {
     validateNames(props)
-    validateMinInSyncReplicas(props)
-    LogConfig.fromProps(LogConfig().toProps, props) // check that we
can parse the values
-  }
-
-  /**
-   * Check that MinInSyncReplicas is reasonable
-   * Unfortunately, we can't validate its smaller than number of replicas
-   * since we don't have this information here
-   */
-  private def validateMinInSyncReplicas(props: Properties) {
-    val minIsr = props.getProperty(MinInSyncReplicasProp)
-    if (minIsr != null && minIsr.toInt < 1) {
-      throw new InvalidConfigException("Wrong value " + minIsr + " of
min.insync.replicas in topic configuration; " +
-        " Valid values are at least 1")
-    }
+    configDef.parse(props)
   }

-}
\ No newline at end of file
+}

Reply via email to