ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623047318


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
     private final int numStandbyReplicas;
     private final long probingRebalanceIntervalMs;
     private final List<String> rackAwareAssignmentTags;
-    private final int rackAwareTrafficCost;
-    private final int rackAwareNonOverlapCost;
+    private final OptionalInt rackAwareTrafficCost;
+    private final OptionalInt rackAwareNonOverlapCost;
     private final String rackAwareAssignmentStrategy;
 
-    public AssignmentConfigs(final StreamsConfig configs) {
-        this(
-            configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-            configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-            configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-            
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-            configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-            
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+    public static AssignmentConfigs of(final StreamsConfig configs) {
+        final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+        final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+        final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+        final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+        final List<String> rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+        final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+        Optional<Integer> rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+        Optional<Integer> rackAwareNonOverlapCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
+
+        final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+        if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+            if (!rackAwareTrafficCost.isPresent()) {
+                rackAwareTrafficCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST);
+            }
+            if (!rackAwareNonOverlapCost.isPresent()) {
+                rackAwareNonOverlapCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST);
+            }
+        }
+

Review Comment:
   nit: should be an else-if rather than two separate if statements



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
     private final int numStandbyReplicas;
     private final long probingRebalanceIntervalMs;
     private final List<String> rackAwareAssignmentTags;
-    private final int rackAwareTrafficCost;
-    private final int rackAwareNonOverlapCost;
+    private final OptionalInt rackAwareTrafficCost;
+    private final OptionalInt rackAwareNonOverlapCost;
     private final String rackAwareAssignmentStrategy;
 
-    public AssignmentConfigs(final StreamsConfig configs) {
-        this(
-            configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-            configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-            configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-            
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-            configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-            
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+    public static AssignmentConfigs of(final StreamsConfig configs) {
+        final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+        final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+        final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+        final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+        final List<String> rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+        final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+        Optional<Integer> rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+        Optional<Integer> rackAwareNonOverlapCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
+
+        final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+        if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+            if (!rackAwareTrafficCost.isPresent()) {
+                rackAwareTrafficCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST);
+            }
+            if (!rackAwareNonOverlapCost.isPresent()) {
+                rackAwareNonOverlapCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST);
+            }
+        }
+
+        if 
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {

Review Comment:
   technically this isn't doing anything yet, since this is the old HAAssignor, 
so a) the `assignorClassName` should never be the old-style HAAssignor, and b) 
we actually still use the old assignment configs for the old assignor so it 
isn't reading these configs anyway
   
   I do think we should leave this in here because otherwise we'll almost 
surely forget to update the configs with the correct default value once we add 
the new version of the HAAssignor. I guess for now we should just check if the 
`assignorClassName` is null because that's the default which means HAAssignor. 
Then eventually we'll change the default to the new HAAssignor and we can 
update this...? I'll file an AK jira for adding the new HAAssignor and put this 
in the ticket as well so we don't forget



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
     private final int numStandbyReplicas;
     private final long probingRebalanceIntervalMs;
     private final List<String> rackAwareAssignmentTags;
-    private final int rackAwareTrafficCost;
-    private final int rackAwareNonOverlapCost;
+    private final OptionalInt rackAwareTrafficCost;
+    private final OptionalInt rackAwareNonOverlapCost;
     private final String rackAwareAssignmentStrategy;
 
-    public AssignmentConfigs(final StreamsConfig configs) {
-        this(
-            configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-            configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-            configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-            
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-            configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-            
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+    public static AssignmentConfigs of(final StreamsConfig configs) {
+        final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+        final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+        final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+        final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+        final List<String> rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+        final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+        Optional<Integer> rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+        Optional<Integer> rackAwareNonOverlapCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
+
+        final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+        if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+            if (!rackAwareTrafficCost.isPresent()) {
+                rackAwareTrafficCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST);
+            }
+            if (!rackAwareNonOverlapCost.isPresent()) {
+                rackAwareNonOverlapCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST);
+            }
+        }
+
+        if 
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {
+            if (!rackAwareTrafficCost.isPresent()) {
+                rackAwareTrafficCost = 
Optional.of(HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST);
+            }
+            if (!rackAwareNonOverlapCost.isPresent()) {
+                rackAwareNonOverlapCost = 
Optional.of(HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST);
+            }
+        }
+
+        return new AssignmentConfigs(
+            acceptableRecoveryLag,
+            maxWarmupReplicas,
+            numStandbyReplicas,
+            probingRebalanceIntervalMs,
+            rackAwareAssignmentTags,
+            
rackAwareTrafficCost.map(OptionalInt::of).orElseGet(OptionalInt::empty),
+            
rackAwareNonOverlapCost.map(OptionalInt::of).orElseGet(OptionalInt::empty),
+            rackAwareAssignmentStrategy
         );
     }
 
-    public AssignmentConfigs(final long acceptableRecoveryLag,
+    private AssignmentConfigs(final long acceptableRecoveryLag,

Review Comment:
   Let's keep this public, it might be useful to users (and ourselves) for unit 
testing. 
   
   If we do make it private then we need to fix the formatting, annoyingly 
"private" and "public" are one letter/space apart 



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
     private final int numStandbyReplicas;
     private final long probingRebalanceIntervalMs;
     private final List<String> rackAwareAssignmentTags;
-    private final int rackAwareTrafficCost;
-    private final int rackAwareNonOverlapCost;
+    private final OptionalInt rackAwareTrafficCost;
+    private final OptionalInt rackAwareNonOverlapCost;
     private final String rackAwareAssignmentStrategy;
 
-    public AssignmentConfigs(final StreamsConfig configs) {
-        this(
-            configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-            configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-            configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-            
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-            configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-            
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+    public static AssignmentConfigs of(final StreamsConfig configs) {
+        final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+        final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+        final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+        final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+        final List<String> rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+        final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+        Optional<Integer> rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));

Review Comment:
   We should make this an `OptionalInt` right from the start. That way we don't 
need to map from Optional<Integer> to OptionalInt when we call the actual 
constructor



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##########
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
     private final int numStandbyReplicas;
     private final long probingRebalanceIntervalMs;
     private final List<String> rackAwareAssignmentTags;
-    private final int rackAwareTrafficCost;
-    private final int rackAwareNonOverlapCost;
+    private final OptionalInt rackAwareTrafficCost;
+    private final OptionalInt rackAwareNonOverlapCost;
     private final String rackAwareAssignmentStrategy;
 
-    public AssignmentConfigs(final StreamsConfig configs) {
-        this(
-            configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-            configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-            configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-            
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-            configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-            
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-            
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+    public static AssignmentConfigs of(final StreamsConfig configs) {
+        final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+        final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+        final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+        final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+        final List<String> rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+        final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+        Optional<Integer> rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+        Optional<Integer> rackAwareNonOverlapCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
+
+        final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+        if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+            if (!rackAwareTrafficCost.isPresent()) {
+                rackAwareTrafficCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST);
+            }
+            if (!rackAwareNonOverlapCost.isPresent()) {
+                rackAwareNonOverlapCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST);
+            }
+        }
+
+        if 
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {

Review Comment:
   Just filed [KAFKA-16869](https://issues.apache.org/jira/browse/KAFKA-16869). 
Put something like this:
   ```
   // TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor class 
once it implements the new TaskAssignor interface
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set<TaskTopicPartition> topicPa
      */
     private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
                                                            final 
AssignedTask.Type taskType) {
-        final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+        final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+        final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
         if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+            LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+                rackAwareAssignmentStrategy);
+            return false;
+        }
+
+        if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {

Review Comment:
   what??????? ridiculous lol



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -40,13 +41,17 @@
 import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
 import org.apache.kafka.streams.processor.assignment.TaskAssignor;
 import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class StickyTaskAssignor implements TaskAssignor {
     private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
 
+    public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
+    public static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;

Review Comment:
   why "stateful"? I also think we should use the word "sticky" in there 
somewhere since we'll be using these right next to where we have similarly 
named variables for other assignor types. eg `DEFAULT_STICKY_TRAFFIC_COST` or 
something like that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to