[ 
https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17935801#comment-17935801
 ] 

Chia-Ping Tsai commented on KAFKA-18877:
----------------------------------------

I do love [~davidarthur] suggestion. Maybe we can add the unsafe interface into 
ControllerWriteOperation#generateRecordsAndResult. for example:

{code:java}
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 56e3848ed3..1055a350a8 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -742,7 +742,7 @@ public final class QuorumController implements Controller {
          *
          * @return A result containing a list of records, and the RPC result.
          */
-        ControllerResult<T> generateRecordsAndResult() throws Exception;
+        ControllerResult<T> generateRecordsAndResult(Unsafer unsafer) throws 
Exception;
 
         /**
          * Once we've passed the records to the Raft layer, we will invoke 
this function
@@ -753,6 +753,10 @@ public final class QuorumController implements Controller {
         }
     }
 
+    interface Unsafer {
+        FeatureControlManager featureControlManager();
+    }
+
     /**
      * A controller event that modifies the controller state.
      */
@@ -764,6 +768,7 @@ public final class QuorumController implements Controller {
         private final EnumSet<ControllerOperationFlag> flags;
         private OptionalLong startProcessingTimeNs = OptionalLong.empty();
         private ControllerResultAndOffset<T> resultAndOffset;
+        private final Unsafer unsafer;
 
         ControllerWriteEvent(
             String name,
@@ -775,6 +780,12 @@ public final class QuorumController implements Controller {
             this.op = op;
             this.flags = flags;
             this.resultAndOffset = null;
+            this.unsafer = new Unsafer() {
+                @Override
+                public FeatureControlManager featureControlManager() {
+                    return featureControl();
+                }
+            };
         }
 
         CompletableFuture<T> future() {
@@ -792,7 +803,7 @@ public final class QuorumController implements Controller {
             if (!isActiveController(controllerEpoch)) {
                 throw 
ControllerExceptions.newWrongControllerException(latestController());
             }
-            ControllerResult<T> result = op.generateRecordsAndResult();
+            ControllerResult<T> result = op.generateRecordsAndResult(unsafer);
             if (result.records().isEmpty()) {
                 op.processBatchEndOffset(offsetControl.nextWriteOffset() - 1);
                 // If the operation did not return any records, then it was 
actually just
@@ -1993,9 +2004,9 @@ public final class QuorumController implements Controller 
{
         BrokerRegistrationRequestData request
     ) {
         return appendWriteEvent("registerBroker", context.deadlineNs(),
-            () -> {
+            unsafer -> {
                 // Read and write data in the controller event handling thread 
to avoid stale information.
-                Map<String, Short> controllerFeatures = new 
HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
+                Map<String, Short> controllerFeatures = new 
HashMap<>(unsafer.featureControlManager().finalizedFeatures(Long.MAX_VALUE).featureMap());
                 // Populate finalized features map with latest known kraft 
version for validation.
                 controllerFeatures.put(KRaftVersion.FEATURE_NAME, 
raftClient.kraftVersion().featureLevel());
                 return clusterControl.
{code}
 

> an mechanism to find cases where we accessed variables from the wrong thread
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-18877
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18877
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Chia-Ping Tsai
>            Assignee: TengYao Chi
>            Priority: Major
>
> from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959]
> There are some _non-thread safe_ classes storing the important information, 
> and so they are expected to be access by specific thread.  Otherwise, it may 
> cause unexpected behavior



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to