gharris1727 commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1364474243
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying Review Comment: ```suggestion * Manages logging levels on a single worker. Supports dynamic adjustment and querying ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java: ########## @@ -16,603 +16,14 @@ */ package org.apache.kafka.connect.util.clusters; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.connect.runtime.AbstractStatus; -import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; -import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; -import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.core.Response; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.stream.Collectors; - -import static org.apache.kafka.test.TestUtils.waitForCondition; - /** - * A set of common assertions that can be applied to a Connect cluster during integration testing + * @deprecated Use {@link ConnectAssertions} instead. */ -public class EmbeddedConnectClusterAssertions { - - private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class); - public static final long WORKER_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(5); - public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30); - public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(2); - // Creating a connector requires two rounds of rebalance; destroying one only requires one - // Assume it'll take ~half the time to destroy a connector as it does to create one - public static final long CONNECTOR_SHUTDOWN_DURATION_MS = TimeUnit.MINUTES.toMillis(1); - private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60); - - private final EmbeddedConnectCluster connect; - - EmbeddedConnectClusterAssertions(EmbeddedConnectCluster connect) { - this.connect = connect; - } - - /** - * Assert that at least the requested number of workers are up and running. - * - * @param numWorkers the number of online workers - */ - public void assertAtLeastNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkWorkersUp(numWorkers, (actual, expected) -> actual >= expected).orElse(false), - WORKER_SETUP_DURATION_MS, - "Didn't meet the minimum requested number of online workers: " + numWorkers); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } - } - - /** - * Assert that at least the requested number of workers are up and running. - * - * @param numWorkers the number of online workers - */ - public void assertExactlyNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkWorkersUp(numWorkers, (actual, expected) -> actual == expected).orElse(false), - WORKER_SETUP_DURATION_MS, - "Didn't meet the exact requested number of online workers: " + numWorkers); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } - } - - /** - * Confirm that the requested number of workers are up and running. - * - * @param numWorkers the number of online workers - * @return true if at least {@code numWorkers} are up; false otherwise - */ - protected Optional<Boolean> checkWorkersUp(int numWorkers, BiFunction<Integer, Integer, Boolean> comp) { - try { - int numUp = connect.activeWorkers().size(); - return Optional.of(comp.apply(numUp, numWorkers)); - } catch (Exception e) { - log.error("Could not check active workers.", e); - return Optional.empty(); - } - } - - /** - * Assert that at least the requested number of workers are up and running. - * - * @param numBrokers the number of online brokers - */ - public void assertExactlyNumBrokersAreUp(int numBrokers, String detailMessage) throws InterruptedException { - try { - waitForCondition( - () -> checkBrokersUp(numBrokers, (actual, expected) -> actual == expected).orElse(false), - WORKER_SETUP_DURATION_MS, - "Didn't meet the exact requested number of online brokers: " + numBrokers); - } catch (AssertionError e) { - throw new AssertionError(detailMessage, e); - } - } - - /** - * Confirm that the requested number of brokers are up and running. - * - * @param numBrokers the number of online brokers - * @return true if at least {@code numBrokers} are up; false otherwise - */ - protected Optional<Boolean> checkBrokersUp(int numBrokers, BiFunction<Integer, Integer, Boolean> comp) { - try { - int numRunning = connect.kafka().runningBrokers().size(); - return Optional.of(comp.apply(numRunning, numBrokers)); - } catch (Exception e) { - log.error("Could not check running brokers.", e); - return Optional.empty(); - } - } - - /** - * Assert that the topics with the specified names do not exist. - * - * @param topicNames the names of the topics that are expected to not exist - */ - public void assertTopicsDoNotExist(String... topicNames) throws InterruptedException { - Set<String> topicNameSet = new HashSet<>(Arrays.asList(topicNames)); - AtomicReference<Set<String>> existingTopics = new AtomicReference<>(topicNameSet); - waitForCondition( - () -> checkTopicsExist(topicNameSet, (actual, expected) -> { - existingTopics.set(actual); - return actual.isEmpty(); - }).orElse(false), - CONNECTOR_SETUP_DURATION_MS, - "Unexpectedly found topics " + existingTopics.get()); - } - - /** - * Assert that the topics with the specified names do exist. - * - * @param topicNames the names of the topics that are expected to exist - */ - public void assertTopicsExist(String... topicNames) throws InterruptedException { - Set<String> topicNameSet = new HashSet<>(Arrays.asList(topicNames)); - AtomicReference<Set<String>> missingTopics = new AtomicReference<>(topicNameSet); - waitForCondition( - () -> checkTopicsExist(topicNameSet, (actual, expected) -> { - Set<String> missing = new HashSet<>(expected); - missing.removeAll(actual); - missingTopics.set(missing); - return missing.isEmpty(); - }).orElse(false), - CONNECTOR_SETUP_DURATION_MS, - "Didn't find the topics " + missingTopics.get()); - } +@Deprecated +public class EmbeddedConnectClusterAssertions extends ConnectAssertions { - protected Optional<Boolean> checkTopicsExist(Set<String> topicNames, BiFunction<Set<String>, Set<String>, Boolean> comp) { - try { - Map<String, Optional<TopicDescription>> topics = connect.kafka().describeTopics(topicNames); - Set<String> actualExistingTopics = topics.entrySet() - .stream() - .filter(e -> e.getValue().isPresent()) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - return Optional.of(comp.apply(actualExistingTopics, topicNames)); - } catch (Exception e) { - log.error("Failed to describe the topic(s): {}.", topicNames, e); - return Optional.empty(); - } + public EmbeddedConnectClusterAssertions(EmbeddedConnect connect) { Review Comment: nit: the constructor was package-private before ```suggestion EmbeddedConnectClusterAssertions(EmbeddedConnect connect) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
