[ https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17935801#comment-17935801 ]
Chia-Ping Tsai commented on KAFKA-18877: ---------------------------------------- I do love [~davidarthur] suggestion. Maybe we can add the unsafe interface into ControllerWriteOperation#generateRecordsAndResult. for example: {code:java} diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 56e3848ed3..1055a350a8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -742,7 +742,7 @@ public final class QuorumController implements Controller { * * @return A result containing a list of records, and the RPC result. */ - ControllerResult<T> generateRecordsAndResult() throws Exception; + ControllerResult<T> generateRecordsAndResult(Unsafer unsafer) throws Exception; /** * Once we've passed the records to the Raft layer, we will invoke this function @@ -753,6 +753,10 @@ public final class QuorumController implements Controller { } } + interface Unsafer { + FeatureControlManager featureControlManager(); + } + /** * A controller event that modifies the controller state. */ @@ -764,6 +768,7 @@ public final class QuorumController implements Controller { private final EnumSet<ControllerOperationFlag> flags; private OptionalLong startProcessingTimeNs = OptionalLong.empty(); private ControllerResultAndOffset<T> resultAndOffset; + private final Unsafer unsafer; ControllerWriteEvent( String name, @@ -775,6 +780,12 @@ public final class QuorumController implements Controller { this.op = op; this.flags = flags; this.resultAndOffset = null; + this.unsafer = new Unsafer() { + @Override + public FeatureControlManager featureControlManager() { + return featureControl(); + } + }; } CompletableFuture<T> future() { @@ -792,7 +803,7 @@ public final class QuorumController implements Controller { if (!isActiveController(controllerEpoch)) { throw ControllerExceptions.newWrongControllerException(latestController()); } - ControllerResult<T> result = op.generateRecordsAndResult(); + ControllerResult<T> result = op.generateRecordsAndResult(unsafer); if (result.records().isEmpty()) { op.processBatchEndOffset(offsetControl.nextWriteOffset() - 1); // If the operation did not return any records, then it was actually just @@ -1993,9 +2004,9 @@ public final class QuorumController implements Controller { BrokerRegistrationRequestData request ) { return appendWriteEvent("registerBroker", context.deadlineNs(), - () -> { + unsafer -> { // Read and write data in the controller event handling thread to avoid stale information. - Map<String, Short> controllerFeatures = new HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap()); + Map<String, Short> controllerFeatures = new HashMap<>(unsafer.featureControlManager().finalizedFeatures(Long.MAX_VALUE).featureMap()); // Populate finalized features map with latest known kraft version for validation. controllerFeatures.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel()); return clusterControl. {code} > an mechanism to find cases where we accessed variables from the wrong thread > ---------------------------------------------------------------------------- > > Key: KAFKA-18877 > URL: https://issues.apache.org/jira/browse/KAFKA-18877 > Project: Kafka > Issue Type: Improvement > Reporter: Chia-Ping Tsai > Assignee: TengYao Chi > Priority: Major > > from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959] > There are some _non-thread safe_ classes storing the important information, > and so they are expected to be access by specific thread. Otherwise, it may > cause unexpected behavior -- This message was sent by Atlassian Jira (v8.20.10#820010)