ableegoldman commented on code in PR #15887: URL: https://github.com/apache/kafka/pull/15887#discussion_r1594892242
########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; Review Comment: These should all be final (with getter APIs but no setters in this class) ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. Review Comment: ```suggestion * Assignment related configs for the Kafka Streams {@link TaskAssignor}. ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. + */ + public long acceptableRecoveryLag() { + return acceptableRecoveryLag; + } + + /** + * Sets the acceptable lag in the task recovery process. + * + * @param acceptableRecoveryLag the acceptable recovery lag to set, in milliseconds. + */ + public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) { + this.acceptableRecoveryLag = acceptableRecoveryLag; + } + + /** + * Returns the maximum number of warmup replicas allowed. + * + * @return the maximum number of warmup replicas. + */ + public int maxWarmupReplicas() { + return maxWarmupReplicas; + } + + /** + * Sets the maximum number of warmup replicas allowed. + * + * @param maxWarmupReplicas the maximum number of warmup replicas to set. + */ + public void setMaxWarmupReplicas(final int maxWarmupReplicas) { + this.maxWarmupReplicas = maxWarmupReplicas; + } + + /** + * Returns the number of standby replicas. + * + * @return the number of standby replicas. + */ + public int numStandbyReplicas() { + return numStandbyReplicas; + } + + /** + * Sets the number of standby replicas. + * + * @param numStandbyReplicas the number of standby replicas to set. + */ + public void setNumStandbyReplicas(final int numStandbyReplicas) { + this.numStandbyReplicas = numStandbyReplicas; + } + + /** + * Returns the interval between probing rebalances in milliseconds. + * + * @return the interval between probing rebalances. Review Comment: ```suggestion * The probing rebalance interval in milliseconds as configured via * {@link StreamsConfig#PROBING_REBALANCE_INTERVAL_MS} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. + */ + public long acceptableRecoveryLag() { + return acceptableRecoveryLag; + } + + /** + * Sets the acceptable lag in the task recovery process. + * + * @param acceptableRecoveryLag the acceptable recovery lag to set, in milliseconds. + */ + public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) { Review Comment: As mentioned in my comment above, let's take out all the setters in this class ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. + */ + public long acceptableRecoveryLag() { + return acceptableRecoveryLag; + } + + /** + * Sets the acceptable lag in the task recovery process. + * + * @param acceptableRecoveryLag the acceptable recovery lag to set, in milliseconds. + */ + public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) { + this.acceptableRecoveryLag = acceptableRecoveryLag; + } + + /** + * Returns the maximum number of warmup replicas allowed. + * + * @return the maximum number of warmup replicas. + */ + public int maxWarmupReplicas() { + return maxWarmupReplicas; + } + + /** + * Sets the maximum number of warmup replicas allowed. + * + * @param maxWarmupReplicas the maximum number of warmup replicas to set. + */ + public void setMaxWarmupReplicas(final int maxWarmupReplicas) { + this.maxWarmupReplicas = maxWarmupReplicas; + } + + /** + * Returns the number of standby replicas. + * + * @return the number of standby replicas. Review Comment: ```suggestion * The number of standby replicas as configured via * {@link StreamsConfig#NUM_STANDBY_REPLICAS_CONFIG} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. Review Comment: I'd just link to the relevant StreamsConfig, where applicable, instead of trying to rewrite the docs for these: ```suggestion * The configured acceptable recovery lag according to * {@link StreamsConfig#ACCEPTABLE_RECOVERY_LAG_CONFIG} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. + */ + public long acceptableRecoveryLag() { + return acceptableRecoveryLag; + } + + /** + * Sets the acceptable lag in the task recovery process. + * + * @param acceptableRecoveryLag the acceptable recovery lag to set, in milliseconds. + */ + public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) { + this.acceptableRecoveryLag = acceptableRecoveryLag; + } + + /** + * Returns the maximum number of warmup replicas allowed. + * + * @return the maximum number of warmup replicas. Review Comment: ```suggestion * The maximum warmup replicas as configured via * {@link StreamsConfig#MAX_WARMUP_REPLICAS_CONFIG} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. Review Comment: The units of this config is "number of records" not ms. But we can just defer to the config docs and keep it simple here: ```suggestion * @return the configured acceptable recovery lag. ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. + */ + public long acceptableRecoveryLag() { + return acceptableRecoveryLag; + } + + /** + * Sets the acceptable lag in the task recovery process. + * + * @param acceptableRecoveryLag the acceptable recovery lag to set, in milliseconds. + */ + public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) { + this.acceptableRecoveryLag = acceptableRecoveryLag; + } + + /** + * Returns the maximum number of warmup replicas allowed. + * + * @return the maximum number of warmup replicas. + */ + public int maxWarmupReplicas() { + return maxWarmupReplicas; + } + + /** + * Sets the maximum number of warmup replicas allowed. + * + * @param maxWarmupReplicas the maximum number of warmup replicas to set. + */ + public void setMaxWarmupReplicas(final int maxWarmupReplicas) { + this.maxWarmupReplicas = maxWarmupReplicas; + } + + /** + * Returns the number of standby replicas. + * + * @return the number of standby replicas. + */ + public int numStandbyReplicas() { + return numStandbyReplicas; + } + + /** + * Sets the number of standby replicas. + * + * @param numStandbyReplicas the number of standby replicas to set. + */ + public void setNumStandbyReplicas(final int numStandbyReplicas) { + this.numStandbyReplicas = numStandbyReplicas; + } + + /** + * Returns the interval between probing rebalances in milliseconds. + * + * @return the interval between probing rebalances. Review Comment: ```suggestion * The probing rebalance interval in milliseconds as configured via * {@link StreamsConfig#PROBING_REBALANCE_INTERVAL_MS_CONFIG} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.time.Instant; +import java.util.Set; +import org.apache.kafka.streams.processor.TaskId; + +/** + * A simple interface for the assignor to return the desired placement of active and standby tasks on + * KafkaStreams clients. + */ +public interface KafkaStreamsAssignment { + /** + * + * @return the {@code ProcessID} associated with this {@code KafkaStreamsAssignment} + */ + ProcessID processId(); + + /** + * + * @return a set of assigned tasks that are part of this {@code KafkaStreamsAssignment} + */ + Set<AssignedTask> assignment(); + + /** + * @return the actual deadline in objective time, after which the followup rebalance will be attempted. + * Equivalent to {@code 'now + followupRebalanceDelay'} + */ + Instant followupRebalanceDeadline(); + + /** + * The container class for a task's id and type. + */ + class AssignedTask { + private final TaskId id; + private final Type taskType; + + public AssignedTask(final TaskId id, final Type taskType) { + this.id = id; + this.taskType = taskType; + } + + /** + * AssignedTasks can be either ACTIVE or STANDBY. + */ Review Comment: Same here: remove this. The code is pretty self-explanatory and if we ever added a new task type there's a good chance we could forget to update the comment. This isn't strictly speaking a style guideline, but it's a general philosophy/rule of thumb for comments/docs in Kafka/Kafka Streams. Since so many different people are updating the code over such a long period of time, docs easily end up out of date and become misleading if not outright incorrect. Doesn't mean you shouldn't include docs, but it's good to consider whether they're adding enough to justify any risk of going out of date ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.time.Instant; +import java.util.Set; +import org.apache.kafka.streams.processor.TaskId; + +/** + * A simple interface for the assignor to return the desired placement of active and standby tasks on + * KafkaStreams clients. + */ +public interface KafkaStreamsAssignment { + /** + * + * @return the {@code ProcessID} associated with this {@code KafkaStreamsAssignment} + */ + ProcessID processId(); + + /** + * + * @return a set of assigned tasks that are part of this {@code KafkaStreamsAssignment} + */ + Set<AssignedTask> assignment(); + + /** + * @return the actual deadline in objective time, after which the followup rebalance will be attempted. Review Comment: ```suggestion * @return the followup rebalance deadline in epoch time, after which this KafkaStreams client will trigger a new rebalance ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. + */ + public long acceptableRecoveryLag() { + return acceptableRecoveryLag; + } + + /** + * Sets the acceptable lag in the task recovery process. + * + * @param acceptableRecoveryLag the acceptable recovery lag to set, in milliseconds. + */ + public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) { + this.acceptableRecoveryLag = acceptableRecoveryLag; + } + + /** + * Returns the maximum number of warmup replicas allowed. + * + * @return the maximum number of warmup replicas. + */ + public int maxWarmupReplicas() { + return maxWarmupReplicas; + } + + /** + * Sets the maximum number of warmup replicas allowed. + * + * @param maxWarmupReplicas the maximum number of warmup replicas to set. + */ + public void setMaxWarmupReplicas(final int maxWarmupReplicas) { + this.maxWarmupReplicas = maxWarmupReplicas; + } + + /** + * Returns the number of standby replicas. + * + * @return the number of standby replicas. + */ + public int numStandbyReplicas() { + return numStandbyReplicas; + } + + /** + * Sets the number of standby replicas. + * + * @param numStandbyReplicas the number of standby replicas to set. + */ + public void setNumStandbyReplicas(final int numStandbyReplicas) { + this.numStandbyReplicas = numStandbyReplicas; + } + + /** + * Returns the interval between probing rebalances in milliseconds. + * + * @return the interval between probing rebalances. + */ + public long probingRebalanceIntervalMs() { + return probingRebalanceIntervalMs; + } + + /** + * Sets the interval between probing rebalances. + * + * @param probingRebalanceIntervalMs the interval between probing rebalances to set, in milliseconds. + */ + public void setProbingRebalanceIntervalMs(final long probingRebalanceIntervalMs) { + this.probingRebalanceIntervalMs = probingRebalanceIntervalMs; + } + + /** + * Returns the list of rack-aware assignment tags. + * + * @return the list of rack-aware assignment tags. Review Comment: ```suggestion * The rack-aware assignment tags as configured via * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TAGS_CONFIG} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Map; +import java.util.SortedSet; +import org.apache.kafka.streams.processor.TaskId; + +/** + * A set of utilities to help implement task assignment. Review Comment: ```suggestion * A set of utilities to help implement task assignment via the {@link TaskAssignor} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentException.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +/** + * The TaskAssignmentException should be thrown when an error occurs during the assignment process + * that cannot be resolved automatically by the assignor alone. + */ +public class TaskAssignmentException extends RuntimeException { Review Comment: I think TaskAssignmentException already exists and is part of the public API, so we don't need to add it. All the exceptions for Streams are under `org.apache.kafka.streams.errors` (btw: if you ever actually do need to introduce a new exception type, it should probably extend StreamsException. Kafka Streams exceptions all extend StreamsException, things in core or the clients will extend KafkaException or one of its subtypes, it's a whole thing) ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Collection; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupAssignment; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; + +/** + * A TaskAssignor is responsible for creating a TaskAssignment from a given + * {@code ApplicationState}. + * The implementation may also override the {@code onAssignmentComputed} callback for insight into + * the result of the assignment result. Review Comment: I do think it's worth expanding the javadocs here, potentially even giving an inline example like we do in some of the major classes in streams and the clients, but it probably makes sense to wait until we're at the end in case anything changes along the way. Since as so many of my comments bring up, we often forget to update javadocs lol ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Map; +import java.util.Optional; +import java.util.SortedSet; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.HostInfo; + +/** + * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance + */ +public interface KafkaStreamsState { + /** + * @return the processId of the application instance running on this KafkaStreams client + */ + ProcessID processId(); + + /** + * Returns the number of processing threads available to work on tasks for this KafkaStreams client, + * which represents its overall capacity for work relative to other KafkaStreams clients. + * + * @return the number of processing threads on this KafkaStreams client + */ + int numProcessingThreads(); + + /** + * @return the set of consumer client ids for this KafkaStreams client + */ + SortedSet<String> consumerClientIds(); + + /** + * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance + */ + SortedSet<TaskId> previousActiveTasks(); + + /** + * @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance + */ + SortedSet<TaskId> previousStandbyTasks(); + + /** + * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client + * did not have any state for this task on disk. + * + * @return end offset sum - offset sum + * Task.LATEST_OFFSET if this was previously an active running task on this client Review Comment: ditto for the `#prevTasksByLag` and `#statefulTasksToLagSums` methods below ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. + */ + public long acceptableRecoveryLag() { + return acceptableRecoveryLag; + } + + /** + * Sets the acceptable lag in the task recovery process. + * + * @param acceptableRecoveryLag the acceptable recovery lag to set, in milliseconds. + */ + public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) { + this.acceptableRecoveryLag = acceptableRecoveryLag; + } + + /** + * Returns the maximum number of warmup replicas allowed. + * + * @return the maximum number of warmup replicas. + */ + public int maxWarmupReplicas() { + return maxWarmupReplicas; + } + + /** + * Sets the maximum number of warmup replicas allowed. + * + * @param maxWarmupReplicas the maximum number of warmup replicas to set. + */ + public void setMaxWarmupReplicas(final int maxWarmupReplicas) { + this.maxWarmupReplicas = maxWarmupReplicas; + } + + /** + * Returns the number of standby replicas. + * + * @return the number of standby replicas. + */ + public int numStandbyReplicas() { + return numStandbyReplicas; + } + + /** + * Sets the number of standby replicas. + * + * @param numStandbyReplicas the number of standby replicas to set. + */ + public void setNumStandbyReplicas(final int numStandbyReplicas) { + this.numStandbyReplicas = numStandbyReplicas; + } + + /** + * Returns the interval between probing rebalances in milliseconds. + * + * @return the interval between probing rebalances. + */ + public long probingRebalanceIntervalMs() { + return probingRebalanceIntervalMs; + } + + /** + * Sets the interval between probing rebalances. + * + * @param probingRebalanceIntervalMs the interval between probing rebalances to set, in milliseconds. + */ + public void setProbingRebalanceIntervalMs(final long probingRebalanceIntervalMs) { + this.probingRebalanceIntervalMs = probingRebalanceIntervalMs; + } + + /** + * Returns the list of rack-aware assignment tags. + * + * @return the list of rack-aware assignment tags. + */ + public List<String> rackAwareAssignmentTags() { + return rackAwareAssignmentTags; + } + + /** + * Sets the list of rack-aware assignment tags. + * + * @param rackAwareAssignmentTags the list of rack-aware assignment tags to set. + */ + public void setRackAwareAssignmentTags(final List<String> rackAwareAssignmentTags) { + this.rackAwareAssignmentTags = rackAwareAssignmentTags; + } + + /** + * Returns the traffic cost. + * + * @return the traffic cost. + */ + public int trafficCost() { + return trafficCost; + } + + /** + * Sets the traffic cost. + * + * @param trafficCost the traffic cost to set. + */ + public void setTrafficCost(final int trafficCost) { + this.trafficCost = trafficCost; + } + + /** + * Returns the non-overlap cost. + * + * @return the non-overlap cost. Review Comment: ```suggestion * The rack-aware assignment non-overlap cost as configured via * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. + */ + public long acceptableRecoveryLag() { + return acceptableRecoveryLag; + } + + /** + * Sets the acceptable lag in the task recovery process. + * + * @param acceptableRecoveryLag the acceptable recovery lag to set, in milliseconds. + */ + public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) { + this.acceptableRecoveryLag = acceptableRecoveryLag; + } + + /** + * Returns the maximum number of warmup replicas allowed. + * + * @return the maximum number of warmup replicas. + */ + public int maxWarmupReplicas() { + return maxWarmupReplicas; + } + + /** + * Sets the maximum number of warmup replicas allowed. + * + * @param maxWarmupReplicas the maximum number of warmup replicas to set. + */ + public void setMaxWarmupReplicas(final int maxWarmupReplicas) { + this.maxWarmupReplicas = maxWarmupReplicas; + } + + /** + * Returns the number of standby replicas. + * + * @return the number of standby replicas. + */ + public int numStandbyReplicas() { + return numStandbyReplicas; + } + + /** + * Sets the number of standby replicas. + * + * @param numStandbyReplicas the number of standby replicas to set. + */ + public void setNumStandbyReplicas(final int numStandbyReplicas) { + this.numStandbyReplicas = numStandbyReplicas; + } + + /** + * Returns the interval between probing rebalances in milliseconds. + * + * @return the interval between probing rebalances. + */ + public long probingRebalanceIntervalMs() { + return probingRebalanceIntervalMs; + } + + /** + * Sets the interval between probing rebalances. + * + * @param probingRebalanceIntervalMs the interval between probing rebalances to set, in milliseconds. + */ + public void setProbingRebalanceIntervalMs(final long probingRebalanceIntervalMs) { + this.probingRebalanceIntervalMs = probingRebalanceIntervalMs; + } + + /** + * Returns the list of rack-aware assignment tags. + * + * @return the list of rack-aware assignment tags. + */ + public List<String> rackAwareAssignmentTags() { + return rackAwareAssignmentTags; + } + + /** + * Sets the list of rack-aware assignment tags. + * + * @param rackAwareAssignmentTags the list of rack-aware assignment tags to set. + */ + public void setRackAwareAssignmentTags(final List<String> rackAwareAssignmentTags) { + this.rackAwareAssignmentTags = rackAwareAssignmentTags; + } + + /** + * Returns the traffic cost. + * + * @return the traffic cost. Review Comment: ```suggestion * The rack-aware assignment traffic cost as configured via * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG} ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.time.Instant; +import java.util.Set; +import org.apache.kafka.streams.processor.TaskId; + +/** + * A simple interface for the assignor to return the desired placement of active and standby tasks on + * KafkaStreams clients. + */ +public interface KafkaStreamsAssignment { + /** + * + * @return the {@code ProcessID} associated with this {@code KafkaStreamsAssignment} + */ + ProcessID processId(); + + /** + * + * @return a set of assigned tasks that are part of this {@code KafkaStreamsAssignment} + */ + Set<AssignedTask> assignment(); + + /** + * @return the actual deadline in objective time, after which the followup rebalance will be attempted. + * Equivalent to {@code 'now + followupRebalanceDelay'} + */ + Instant followupRebalanceDeadline(); + + /** + * The container class for a task's id and type. + */ Review Comment: Remove this, doesn't add much and it's one of those things that we'll almost definitely forget to update if we ever change this class ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessID.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import org.apache.kafka.common.protocol.types.Field.UUID; + +/** A simple wrapper around UUID that abstracts a Process ID */ +public class ProcessID { Review Comment: nit: rename to `ProcessId` (lowercase 'd') to be more consistent with naming in Streams ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.List; + +/** + * Configuration for the KafkaStreams Task Assignor. + */ +public class AssignmentConfigs { + + private long acceptableRecoveryLag; + private int maxWarmupReplicas; + private int nonOverlapCost; + private int numStandbyReplicas; + private long probingRebalanceIntervalMs; + private List<String> rackAwareAssignmentTags; + private int trafficCost; + + /** + * Returns the acceptable lag in the task recovery process in milliseconds. + * + * @return the number of milliseconds of acceptable recovery lag. Review Comment: Or actually, alternatively just get rid of all the `@return` lines in the getter javadocs here, since they pretty much just repeat what we say in the first paragraph of the javadocs ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + +import java.util.Map; +import java.util.Optional; +import java.util.SortedSet; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.HostInfo; + +/** + * A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance + */ +public interface KafkaStreamsState { + /** + * @return the processId of the application instance running on this KafkaStreams client + */ + ProcessID processId(); + + /** + * Returns the number of processing threads available to work on tasks for this KafkaStreams client, + * which represents its overall capacity for work relative to other KafkaStreams clients. + * + * @return the number of processing threads on this KafkaStreams client + */ + int numProcessingThreads(); + + /** + * @return the set of consumer client ids for this KafkaStreams client + */ + SortedSet<String> consumerClientIds(); + + /** + * @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance + */ + SortedSet<TaskId> previousActiveTasks(); + + /** + * @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance + */ + SortedSet<TaskId> previousStandbyTasks(); + + /** + * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client + * did not have any state for this task on disk. + * + * @return end offset sum - offset sum + * Task.LATEST_OFFSET if this was previously an active running task on this client Review Comment: I realize I forgot this in the KIP, but we should throw an exception if this method is called and the user did not request task lags be computed. Not sure what exception type makes the most sense here, maybe IllegalArgumentException or UnsupportedOperationException? Whichever you choose, just add a `@throws` statement to the javadocs here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org