lianetm commented on code in PR #18945: URL: https://github.com/apache/kafka/pull/18945#discussion_r1963837019
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ########## @@ -1561,28 +1555,25 @@ public void onFailure(RuntimeException e) { } } catch (AuthenticationException e) { log.error("An authentication error occurred in the heartbeat thread", e); - this.failed.set(e); + setFailed(e); } catch (GroupAuthorizationException e) { log.error("A group authorization error occurred in the heartbeat thread", e); - this.failed.set(e); + setFailed(e); } catch (InterruptedException | InterruptException e) { Thread.interrupted(); log.error("Unexpected interrupt received in heartbeat thread", e); - this.failed.set(new RuntimeException(e)); + setFailed(new RuntimeException(e)); } catch (Throwable e) { log.error("Heartbeat thread failed due to unexpected error", e); if (e instanceof RuntimeException) - this.failed.set((RuntimeException) e); + setFailed((RuntimeException) e); else - this.failed.set(new RuntimeException(e)); + setFailed(new RuntimeException(e)); } finally { log.debug("Heartbeat thread has closed"); - synchronized (AbstractCoordinator.this) { Review Comment: Don't we need to keep this synchronization here? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java: ########## @@ -230,7 +230,8 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { this.interceptors, config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), config.getString(ConsumerConfig.CLIENT_RACK_CONFIG), - clientTelemetryReporter); + clientTelemetryReporter, + Optional.empty()); Review Comment: we're introducing this param only for some tests, but end up having to pass it empty in lots of cases (this consumer class mainly, and other test files). So I wonder if it would be a fair trade off in this case to add a another constructor to the `ConsumerCoordinator` to take this param, but keep also the existing one that does not take it. Seems that we would reduce the scope of the changes and avoid the noise of this empty param when not needed. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatThread.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.KafkaThread; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractHeartbeatThread extends KafkaThread implements AutoCloseable { Review Comment: Should we add a java doc to show that this is a wrapper for a KafkaThread that allows to be enabled/disabled (looks like the main thing right?). And seeing it like this, is it intentionally abstract? No harm in it really if the intention is to express that we don't want to allow instances of it, but caught my attention (vs. non-abstract ~BaseHeartbeatThread) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockHeartbeatThread.java: ########## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +public class MockHeartbeatThread extends AbstractHeartbeatThread { Review Comment: do we really need this class or would it be enough to simply use a `mock(AbstractHeartbeatThread)`? (and verify the calls to enable()/disable() instead of checking the actual value of the boolean. Would that work? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatThread.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.KafkaThread; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractHeartbeatThread extends KafkaThread implements AutoCloseable { + private final AtomicBoolean enabled = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicReference<RuntimeException> failed = new AtomicReference<>(null); Review Comment: `failed` sounds like a boolean but it's the `error`/`failure`, should we rename it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org