ableegoldman commented on code in PR #15887:
URL: https://github.com/apache/kafka/pull/15887#discussion_r1594892242


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;

Review Comment:
   These should all be final (with getter APIs but no setters in this class)



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.

Review Comment:
   ```suggestion
    * Assignment related configs for the Kafka Streams {@link TaskAssignor}.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * Sets the acceptable lag in the task recovery process.
+     *
+     * @param acceptableRecoveryLag the acceptable recovery lag to set, in 
milliseconds.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+        this.acceptableRecoveryLag = acceptableRecoveryLag;
+    }
+
+    /**
+     * Returns the maximum number of warmup replicas allowed.
+     *
+     * @return the maximum number of warmup replicas.
+     */
+    public int maxWarmupReplicas() {
+        return maxWarmupReplicas;
+    }
+
+    /**
+     * Sets the maximum number of warmup replicas allowed.
+     *
+     * @param maxWarmupReplicas the maximum number of warmup replicas to set.
+     */
+    public void setMaxWarmupReplicas(final int maxWarmupReplicas) {
+        this.maxWarmupReplicas = maxWarmupReplicas;
+    }
+
+    /**
+     * Returns the number of standby replicas.
+     *
+     * @return the number of standby replicas.
+     */
+    public int numStandbyReplicas() {
+        return numStandbyReplicas;
+    }
+
+    /**
+     * Sets the number of standby replicas.
+     *
+     * @param numStandbyReplicas the number of standby replicas to set.
+     */
+    public void setNumStandbyReplicas(final int numStandbyReplicas) {
+        this.numStandbyReplicas = numStandbyReplicas;
+    }
+
+    /**
+     * Returns the interval between probing rebalances in milliseconds.
+     *
+     * @return the interval between probing rebalances.

Review Comment:
   ```suggestion
        * The probing rebalance interval in milliseconds as configured via 
        * {@link StreamsConfig#PROBING_REBALANCE_INTERVAL_MS}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * Sets the acceptable lag in the task recovery process.
+     *
+     * @param acceptableRecoveryLag the acceptable recovery lag to set, in 
milliseconds.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {

Review Comment:
   As mentioned in my comment above, let's take out all the setters in this 
class



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * Sets the acceptable lag in the task recovery process.
+     *
+     * @param acceptableRecoveryLag the acceptable recovery lag to set, in 
milliseconds.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+        this.acceptableRecoveryLag = acceptableRecoveryLag;
+    }
+
+    /**
+     * Returns the maximum number of warmup replicas allowed.
+     *
+     * @return the maximum number of warmup replicas.
+     */
+    public int maxWarmupReplicas() {
+        return maxWarmupReplicas;
+    }
+
+    /**
+     * Sets the maximum number of warmup replicas allowed.
+     *
+     * @param maxWarmupReplicas the maximum number of warmup replicas to set.
+     */
+    public void setMaxWarmupReplicas(final int maxWarmupReplicas) {
+        this.maxWarmupReplicas = maxWarmupReplicas;
+    }
+
+    /**
+     * Returns the number of standby replicas.
+     *
+     * @return the number of standby replicas.

Review Comment:
   ```suggestion
        * The number of standby replicas as configured via 
        * {@link StreamsConfig#NUM_STANDBY_REPLICAS_CONFIG}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.

Review Comment:
    I'd just link to the relevant StreamsConfig, where applicable, instead of 
trying to rewrite the docs for these:
   
   ```suggestion
        * The configured acceptable recovery lag according to 
        * {@link StreamsConfig#ACCEPTABLE_RECOVERY_LAG_CONFIG}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * Sets the acceptable lag in the task recovery process.
+     *
+     * @param acceptableRecoveryLag the acceptable recovery lag to set, in 
milliseconds.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+        this.acceptableRecoveryLag = acceptableRecoveryLag;
+    }
+
+    /**
+     * Returns the maximum number of warmup replicas allowed.
+     *
+     * @return the maximum number of warmup replicas.

Review Comment:
   ```suggestion
        * The maximum warmup replicas as configured via
        * {@link StreamsConfig#MAX_WARMUP_REPLICAS_CONFIG}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.

Review Comment:
   The units of this config is "number of records" not ms. But we can just 
defer to the config docs and keep it simple here:
   
   ```suggestion
        * @return the configured acceptable recovery lag.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * Sets the acceptable lag in the task recovery process.
+     *
+     * @param acceptableRecoveryLag the acceptable recovery lag to set, in 
milliseconds.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+        this.acceptableRecoveryLag = acceptableRecoveryLag;
+    }
+
+    /**
+     * Returns the maximum number of warmup replicas allowed.
+     *
+     * @return the maximum number of warmup replicas.
+     */
+    public int maxWarmupReplicas() {
+        return maxWarmupReplicas;
+    }
+
+    /**
+     * Sets the maximum number of warmup replicas allowed.
+     *
+     * @param maxWarmupReplicas the maximum number of warmup replicas to set.
+     */
+    public void setMaxWarmupReplicas(final int maxWarmupReplicas) {
+        this.maxWarmupReplicas = maxWarmupReplicas;
+    }
+
+    /**
+     * Returns the number of standby replicas.
+     *
+     * @return the number of standby replicas.
+     */
+    public int numStandbyReplicas() {
+        return numStandbyReplicas;
+    }
+
+    /**
+     * Sets the number of standby replicas.
+     *
+     * @param numStandbyReplicas the number of standby replicas to set.
+     */
+    public void setNumStandbyReplicas(final int numStandbyReplicas) {
+        this.numStandbyReplicas = numStandbyReplicas;
+    }
+
+    /**
+     * Returns the interval between probing rebalances in milliseconds.
+     *
+     * @return the interval between probing rebalances.

Review Comment:
   ```suggestion
        * The probing rebalance interval in milliseconds as configured via 
        * {@link StreamsConfig#PROBING_REBALANCE_INTERVAL_MS_CONFIG}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.time.Instant;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A simple interface for the assignor to return the desired placement of 
active and standby tasks on
+ * KafkaStreams clients.
+ */
+public interface KafkaStreamsAssignment {
+    /**
+     *
+     * @return the {@code ProcessID} associated with this {@code 
KafkaStreamsAssignment}
+     */
+    ProcessID processId();
+
+    /**
+     *
+     * @return a set of assigned tasks that are part of this {@code 
KafkaStreamsAssignment}
+     */
+    Set<AssignedTask> assignment();
+
+    /**
+     * @return the actual deadline in objective time, after which the followup 
rebalance will be attempted.
+     * Equivalent to {@code 'now + followupRebalanceDelay'}
+     */
+    Instant followupRebalanceDeadline();
+
+    /**
+     * The container class for a task's id and type.
+     */
+    class AssignedTask {
+        private final TaskId id;
+        private final Type taskType;
+
+        public AssignedTask(final TaskId id, final Type taskType) {
+            this.id = id;
+            this.taskType = taskType;
+        }
+
+        /**
+         * AssignedTasks can be either ACTIVE or STANDBY.
+         */

Review Comment:
   Same here: remove this. The code is pretty self-explanatory and if we ever 
added a new task type there's a good chance we could forget to update the 
comment. 
   
   This isn't strictly speaking a style guideline, but it's a general 
philosophy/rule of thumb for comments/docs in Kafka/Kafka Streams. Since so 
many different people are updating the code over such a long period of time, 
docs easily end up out of date and become misleading if not outright incorrect. 
Doesn't mean you shouldn't include docs, but it's good to consider whether 
they're adding enough to justify any risk of going out of date



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.time.Instant;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A simple interface for the assignor to return the desired placement of 
active and standby tasks on
+ * KafkaStreams clients.
+ */
+public interface KafkaStreamsAssignment {
+    /**
+     *
+     * @return the {@code ProcessID} associated with this {@code 
KafkaStreamsAssignment}
+     */
+    ProcessID processId();
+
+    /**
+     *
+     * @return a set of assigned tasks that are part of this {@code 
KafkaStreamsAssignment}
+     */
+    Set<AssignedTask> assignment();
+
+    /**
+     * @return the actual deadline in objective time, after which the followup 
rebalance will be attempted.

Review Comment:
   ```suggestion
        * @return the followup rebalance deadline in epoch time, after which 
this KafkaStreams client will trigger a new rebalance
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * Sets the acceptable lag in the task recovery process.
+     *
+     * @param acceptableRecoveryLag the acceptable recovery lag to set, in 
milliseconds.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+        this.acceptableRecoveryLag = acceptableRecoveryLag;
+    }
+
+    /**
+     * Returns the maximum number of warmup replicas allowed.
+     *
+     * @return the maximum number of warmup replicas.
+     */
+    public int maxWarmupReplicas() {
+        return maxWarmupReplicas;
+    }
+
+    /**
+     * Sets the maximum number of warmup replicas allowed.
+     *
+     * @param maxWarmupReplicas the maximum number of warmup replicas to set.
+     */
+    public void setMaxWarmupReplicas(final int maxWarmupReplicas) {
+        this.maxWarmupReplicas = maxWarmupReplicas;
+    }
+
+    /**
+     * Returns the number of standby replicas.
+     *
+     * @return the number of standby replicas.
+     */
+    public int numStandbyReplicas() {
+        return numStandbyReplicas;
+    }
+
+    /**
+     * Sets the number of standby replicas.
+     *
+     * @param numStandbyReplicas the number of standby replicas to set.
+     */
+    public void setNumStandbyReplicas(final int numStandbyReplicas) {
+        this.numStandbyReplicas = numStandbyReplicas;
+    }
+
+    /**
+     * Returns the interval between probing rebalances in milliseconds.
+     *
+     * @return the interval between probing rebalances.
+     */
+    public long probingRebalanceIntervalMs() {
+        return probingRebalanceIntervalMs;
+    }
+
+    /**
+     * Sets the interval between probing rebalances.
+     *
+     * @param probingRebalanceIntervalMs the interval between probing 
rebalances to set, in milliseconds.
+     */
+    public void setProbingRebalanceIntervalMs(final long 
probingRebalanceIntervalMs) {
+        this.probingRebalanceIntervalMs = probingRebalanceIntervalMs;
+    }
+
+    /**
+     * Returns the list of rack-aware assignment tags.
+     *
+     * @return the list of rack-aware assignment tags.

Review Comment:
   ```suggestion
        * The rack-aware assignment tags as configured via 
        * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TAGS_CONFIG}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Map;
+import java.util.SortedSet;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A set of utilities to help implement task assignment.

Review Comment:
   ```suggestion
    * A set of utilities to help implement task assignment via the {@link 
TaskAssignor}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+/**
+ * The TaskAssignmentException should be thrown when an error occurs during 
the assignment process
+ * that cannot be resolved automatically by the assignor alone.
+ */
+public class TaskAssignmentException extends RuntimeException {

Review Comment:
   I think TaskAssignmentException already exists and is part of the public 
API, so we don't need to add it. All the exceptions for Streams are under 
`org.apache.kafka.streams.errors`
   
   (btw: if you ever actually do need to introduce a new exception type, it 
should probably extend StreamsException. Kafka Streams exceptions all extend 
StreamsException, things in core or the clients will extend KafkaException or 
one of its subtypes, it's a whole thing)



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Collection;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupAssignment;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+
+/**
+ * A TaskAssignor is responsible for creating a TaskAssignment from a given
+ * {@code ApplicationState}.
+ * The implementation may also override the {@code onAssignmentComputed} 
callback for insight into
+ * the result of the assignment result.

Review Comment:
   I do think it's worth expanding the javadocs here, potentially even giving 
an inline example like we do in some of the major classes in streams and the 
clients, but it probably makes sense to wait until we're at the end in case 
anything changes along the way. Since as so many of my comments bring up, we 
often forget to update javadocs lol



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedSet;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
+
+/**
+ * A read-only metadata class representing the current state of each 
KafkaStreams client with at least one StreamThread participating in this 
rebalance
+ */
+public interface KafkaStreamsState {
+    /**
+     * @return the processId of the application instance running on this 
KafkaStreams client
+     */
+    ProcessID processId();
+
+    /**
+     * Returns the number of processing threads available to work on tasks for 
this KafkaStreams client,
+     * which represents its overall capacity for work relative to other 
KafkaStreams clients.
+     *
+     * @return the number of processing threads on this KafkaStreams client
+     */
+    int numProcessingThreads();
+
+    /**
+     * @return the set of consumer client ids for this KafkaStreams client
+     */
+    SortedSet<String> consumerClientIds();
+
+    /**
+     * @return the set of all active tasks owned by consumers on this 
KafkaStreams client since the previous rebalance
+     */
+    SortedSet<TaskId> previousActiveTasks();
+
+    /**
+     * @return the set of all standby tasks owned by consumers on this 
KafkaStreams client since the previous rebalance
+     */
+    SortedSet<TaskId> previousStandbyTasks();
+
+    /**
+     * Returns the total lag across all logged stores in the task. Equal to 
the end offset sum if this client
+     * did not have any state for this task on disk.
+     *
+     * @return end offset sum - offset sum
+     *                    Task.LATEST_OFFSET if this was previously an active 
running task on this client

Review Comment:
   ditto for the `#prevTasksByLag` and `#statefulTasksToLagSums` methods below



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * Sets the acceptable lag in the task recovery process.
+     *
+     * @param acceptableRecoveryLag the acceptable recovery lag to set, in 
milliseconds.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+        this.acceptableRecoveryLag = acceptableRecoveryLag;
+    }
+
+    /**
+     * Returns the maximum number of warmup replicas allowed.
+     *
+     * @return the maximum number of warmup replicas.
+     */
+    public int maxWarmupReplicas() {
+        return maxWarmupReplicas;
+    }
+
+    /**
+     * Sets the maximum number of warmup replicas allowed.
+     *
+     * @param maxWarmupReplicas the maximum number of warmup replicas to set.
+     */
+    public void setMaxWarmupReplicas(final int maxWarmupReplicas) {
+        this.maxWarmupReplicas = maxWarmupReplicas;
+    }
+
+    /**
+     * Returns the number of standby replicas.
+     *
+     * @return the number of standby replicas.
+     */
+    public int numStandbyReplicas() {
+        return numStandbyReplicas;
+    }
+
+    /**
+     * Sets the number of standby replicas.
+     *
+     * @param numStandbyReplicas the number of standby replicas to set.
+     */
+    public void setNumStandbyReplicas(final int numStandbyReplicas) {
+        this.numStandbyReplicas = numStandbyReplicas;
+    }
+
+    /**
+     * Returns the interval between probing rebalances in milliseconds.
+     *
+     * @return the interval between probing rebalances.
+     */
+    public long probingRebalanceIntervalMs() {
+        return probingRebalanceIntervalMs;
+    }
+
+    /**
+     * Sets the interval between probing rebalances.
+     *
+     * @param probingRebalanceIntervalMs the interval between probing 
rebalances to set, in milliseconds.
+     */
+    public void setProbingRebalanceIntervalMs(final long 
probingRebalanceIntervalMs) {
+        this.probingRebalanceIntervalMs = probingRebalanceIntervalMs;
+    }
+
+    /**
+     * Returns the list of rack-aware assignment tags.
+     *
+     * @return the list of rack-aware assignment tags.
+     */
+    public List<String> rackAwareAssignmentTags() {
+        return rackAwareAssignmentTags;
+    }
+
+    /**
+     * Sets the list of rack-aware assignment tags.
+     *
+     * @param rackAwareAssignmentTags the list of rack-aware assignment tags 
to set.
+     */
+    public void setRackAwareAssignmentTags(final List<String> 
rackAwareAssignmentTags) {
+        this.rackAwareAssignmentTags = rackAwareAssignmentTags;
+    }
+
+    /**
+     * Returns the traffic cost.
+     *
+     * @return the traffic cost.
+     */
+    public int trafficCost() {
+        return trafficCost;
+    }
+
+    /**
+     * Sets the traffic cost.
+     *
+     * @param trafficCost the traffic cost to set.
+     */
+    public void setTrafficCost(final int trafficCost) {
+        this.trafficCost = trafficCost;
+    }
+
+    /**
+     * Returns the non-overlap cost.
+     *
+     * @return the non-overlap cost.

Review Comment:
   ```suggestion
        * The rack-aware assignment non-overlap cost as configured via 
        * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.
+     */
+    public long acceptableRecoveryLag() {
+        return acceptableRecoveryLag;
+    }
+
+    /**
+     * Sets the acceptable lag in the task recovery process.
+     *
+     * @param acceptableRecoveryLag the acceptable recovery lag to set, in 
milliseconds.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+        this.acceptableRecoveryLag = acceptableRecoveryLag;
+    }
+
+    /**
+     * Returns the maximum number of warmup replicas allowed.
+     *
+     * @return the maximum number of warmup replicas.
+     */
+    public int maxWarmupReplicas() {
+        return maxWarmupReplicas;
+    }
+
+    /**
+     * Sets the maximum number of warmup replicas allowed.
+     *
+     * @param maxWarmupReplicas the maximum number of warmup replicas to set.
+     */
+    public void setMaxWarmupReplicas(final int maxWarmupReplicas) {
+        this.maxWarmupReplicas = maxWarmupReplicas;
+    }
+
+    /**
+     * Returns the number of standby replicas.
+     *
+     * @return the number of standby replicas.
+     */
+    public int numStandbyReplicas() {
+        return numStandbyReplicas;
+    }
+
+    /**
+     * Sets the number of standby replicas.
+     *
+     * @param numStandbyReplicas the number of standby replicas to set.
+     */
+    public void setNumStandbyReplicas(final int numStandbyReplicas) {
+        this.numStandbyReplicas = numStandbyReplicas;
+    }
+
+    /**
+     * Returns the interval between probing rebalances in milliseconds.
+     *
+     * @return the interval between probing rebalances.
+     */
+    public long probingRebalanceIntervalMs() {
+        return probingRebalanceIntervalMs;
+    }
+
+    /**
+     * Sets the interval between probing rebalances.
+     *
+     * @param probingRebalanceIntervalMs the interval between probing 
rebalances to set, in milliseconds.
+     */
+    public void setProbingRebalanceIntervalMs(final long 
probingRebalanceIntervalMs) {
+        this.probingRebalanceIntervalMs = probingRebalanceIntervalMs;
+    }
+
+    /**
+     * Returns the list of rack-aware assignment tags.
+     *
+     * @return the list of rack-aware assignment tags.
+     */
+    public List<String> rackAwareAssignmentTags() {
+        return rackAwareAssignmentTags;
+    }
+
+    /**
+     * Sets the list of rack-aware assignment tags.
+     *
+     * @param rackAwareAssignmentTags the list of rack-aware assignment tags 
to set.
+     */
+    public void setRackAwareAssignmentTags(final List<String> 
rackAwareAssignmentTags) {
+        this.rackAwareAssignmentTags = rackAwareAssignmentTags;
+    }
+
+    /**
+     * Returns the traffic cost.
+     *
+     * @return the traffic cost.

Review Comment:
   ```suggestion
        * The rack-aware assignment traffic cost as configured via 
        * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG}
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.time.Instant;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A simple interface for the assignor to return the desired placement of 
active and standby tasks on
+ * KafkaStreams clients.
+ */
+public interface KafkaStreamsAssignment {
+    /**
+     *
+     * @return the {@code ProcessID} associated with this {@code 
KafkaStreamsAssignment}
+     */
+    ProcessID processId();
+
+    /**
+     *
+     * @return a set of assigned tasks that are part of this {@code 
KafkaStreamsAssignment}
+     */
+    Set<AssignedTask> assignment();
+
+    /**
+     * @return the actual deadline in objective time, after which the followup 
rebalance will be attempted.
+     * Equivalent to {@code 'now + followupRebalanceDelay'}
+     */
+    Instant followupRebalanceDeadline();
+
+    /**
+     * The container class for a task's id and type.
+     */

Review Comment:
   Remove this, doesn't add much and it's one of those things that we'll almost 
definitely forget to update if we ever change this class



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessID.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import org.apache.kafka.common.protocol.types.Field.UUID;
+
+/** A simple wrapper around UUID that abstracts a Process ID */
+public class ProcessID {

Review Comment:
   nit: rename to `ProcessId` (lowercase 'd') to be more consistent with naming 
in Streams



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.List;
+
+/**
+ * Configuration for the KafkaStreams Task Assignor.
+ */
+public class AssignmentConfigs {
+
+    private long acceptableRecoveryLag;
+    private int maxWarmupReplicas;
+    private int nonOverlapCost;
+    private int numStandbyReplicas;
+    private long probingRebalanceIntervalMs;
+    private List<String> rackAwareAssignmentTags;
+    private int trafficCost;
+
+    /**
+     * Returns the acceptable lag in the task recovery process in milliseconds.
+     *
+     * @return the number of milliseconds of acceptable recovery lag.

Review Comment:
   Or actually, alternatively just get rid of all the `@return` lines in the 
getter javadocs here, since they pretty much just repeat what we say in the 
first paragraph of the javadocs



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedSet;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
+
+/**
+ * A read-only metadata class representing the current state of each 
KafkaStreams client with at least one StreamThread participating in this 
rebalance
+ */
+public interface KafkaStreamsState {
+    /**
+     * @return the processId of the application instance running on this 
KafkaStreams client
+     */
+    ProcessID processId();
+
+    /**
+     * Returns the number of processing threads available to work on tasks for 
this KafkaStreams client,
+     * which represents its overall capacity for work relative to other 
KafkaStreams clients.
+     *
+     * @return the number of processing threads on this KafkaStreams client
+     */
+    int numProcessingThreads();
+
+    /**
+     * @return the set of consumer client ids for this KafkaStreams client
+     */
+    SortedSet<String> consumerClientIds();
+
+    /**
+     * @return the set of all active tasks owned by consumers on this 
KafkaStreams client since the previous rebalance
+     */
+    SortedSet<TaskId> previousActiveTasks();
+
+    /**
+     * @return the set of all standby tasks owned by consumers on this 
KafkaStreams client since the previous rebalance
+     */
+    SortedSet<TaskId> previousStandbyTasks();
+
+    /**
+     * Returns the total lag across all logged stores in the task. Equal to 
the end offset sum if this client
+     * did not have any state for this task on disk.
+     *
+     * @return end offset sum - offset sum
+     *                    Task.LATEST_OFFSET if this was previously an active 
running task on this client

Review Comment:
   I realize I forgot this in the KIP, but we should throw an exception if this 
method is called and the user did not request task lags be computed. Not sure 
what exception type makes the most sense here, maybe  IllegalArgumentException 
or UnsupportedOperationException?
   
   Whichever you choose, just add a `@throws` statement to the javadocs here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to