junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397751986
########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; + + private ClientMetricsManager() { + this(Time.SYSTEM); + } + + // Visible for testing + ClientMetricsManager(Time time) { + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.time = time; + } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + this.subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(throttleMs, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, telemetryMaxBytes, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(throttleMs, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(throttleMs, Errors.INVALID_RECORD); + } + } + + clientInstance.lastKnownError(Errors.NONE); + return new PushTelemetryResponse(new PushTelemetryResponseData().setThrottleTimeMs(throttleMs)); } @Override public void close() throws IOException { - // TODO: Implement the close logic to close the client metrics manager. + subscriptionMap.clear(); + } + + private void updateClientSubscription(String subscriptionName, ClientMetricsConfigs configs) { + List<String> metrics = configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS); + int pushInterval = configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS); + List<String> clientMatchPattern = configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN); + + /* + Update last subscription updated time to current time to indicate that there is a change Review Comment: The comment seems outdated since we don't update the time now. ########## core/src/main/java/kafka/metrics/ClientMetricsInstance.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Objects; +import java.util.Set; + +/** + * Contains the metrics instance metadata and the state of the client instance. + */ +public class ClientMetricsInstance { + + private final Uuid clientInstanceId; + private final ClientMetricsInstanceMetadata instanceMetadata; + private final int subscriptionId; + private final int subscriptionVersion; + private final Set<String> metrics; + private final int pushIntervalMs; + + private long lastGetRequestEpoch; Review Comment: `lastGetRequestEpoch` => `lastGetRequestTimestamp`? Ditto for `lastPushRequestEpoch`. ########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; + + private ClientMetricsManager() { + this(Time.SYSTEM); + } + + // Visible for testing + ClientMetricsManager(Time time) { + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.time = time; + } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + this.subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(throttleMs, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, telemetryMaxBytes, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(throttleMs, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(throttleMs, Errors.INVALID_RECORD); + } + } + + clientInstance.lastKnownError(Errors.NONE); + return new PushTelemetryResponse(new PushTelemetryResponseData().setThrottleTimeMs(throttleMs)); } @Override public void close() throws IOException { - // TODO: Implement the close logic to close the client metrics manager. + subscriptionMap.clear(); + } + + private void updateClientSubscription(String subscriptionName, ClientMetricsConfigs configs) { + List<String> metrics = configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS); + int pushInterval = configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS); + List<String> clientMatchPattern = configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN); + + /* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. + */ + int version = this.subscriptionUpdateVersion.incrementAndGet(); Review Comment: What's the convention of using `this` when accessing a field? It seems that class sometimes uses `this` and some other times doesn't. ########## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ########## @@ -0,0 +1,951 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; +import kafka.utils.TestUtils; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsManagerTest { + + private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsManagerTest.class); + + private Properties props; + private KafkaConfig config; + private MockTime time; + private ClientMetricsManager clientMetricsManager; + + @BeforeEach + public void setUp() { + props = TestUtils.createDummyBrokerConfig(); + props.setProperty(KafkaConfig.ClientTelemetryMaxBytesProp(), "100"); + config = new KafkaConfig(props); + time = new MockTime(); + clientMetricsManager = new ClientMetricsManager(config, time); + } + + @Test + public void testUpdateSubscription() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + + assertEquals(1, clientMetricsManager.subscriptions().size()); + assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + + SubscriptionInfo subscriptionInfo = clientMetricsManager.subscriptionInfo("sub-1"); + Set<String> metrics = subscriptionInfo.metrics(); + + // Validate metrics. + assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> + assertTrue(metrics.contains(metric))); + // Validate push interval. + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), + String.valueOf(subscriptionInfo.intervalMs())); + + // Validate match patterns. + assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), + subscriptionInfo.matchPattern().size()); + ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { + String[] split = pattern.split("="); + assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); + assertEquals(split[1], subscriptionInfo.matchPattern().get(split[0]).pattern()); + }); + assertEquals(1, clientMetricsManager.subscriptionUpdateVersion()); + } + + @Test + public void testUpdateSubscriptionWithEmptyProperties() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + clientMetricsManager.updateSubscription("sub-1", new Properties()); + // No subscription should be added as the properties are empty. + assertEquals(0, clientMetricsManager.subscriptions().size()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + } + + @Test + public void testUpdateSubscriptionWithNullProperties() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + // Properties shouldn't be passed as null. + assertThrows(NullPointerException.class, () -> + clientMetricsManager.updateSubscription("sub-1", null)); + assertEquals(0, clientMetricsManager.subscriptions().size()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + } + + @Test + public void testUpdateSubscriptionWithInvalidMetricsProperties() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + + Properties properties = new Properties(); + properties.put("random", "random"); + assertThrows(InvalidRequestException.class, () -> clientMetricsManager.updateSubscription("sub-1", properties)); + } + + @Test + public void testUpdateSubscriptionWithPropertiesDeletion() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + + Properties properties = new Properties(); + properties.put("interval.ms", "100"); + clientMetricsManager.updateSubscription("sub-1", properties); + assertEquals(1, clientMetricsManager.subscriptions().size()); + assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + assertEquals(1, clientMetricsManager.subscriptionUpdateVersion()); + + clientMetricsManager.updateSubscription("sub-1", new Properties()); + // Subscription should be removed as all properties are removed. + assertEquals(0, clientMetricsManager.subscriptions().size()); + assertEquals(2, clientMetricsManager.subscriptionUpdateVersion()); + } + + @Test + public void testGetTelemetry() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertNotNull(response.data().clientInstanceId()); + assertTrue(response.data().subscriptionId() != 0); + + assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, response.data().requestedMetrics().size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> + assertTrue(response.data().requestedMetrics().contains(metric))); + + assertEquals(4, response.data().acceptedCompressionTypes().size()); + // validate compression types order. + assertEquals(CompressionType.ZSTD.id, response.data().acceptedCompressionTypes().get(0)); + assertEquals(CompressionType.LZ4.id, response.data().acceptedCompressionTypes().get(1)); + assertEquals(CompressionType.GZIP.id, response.data().acceptedCompressionTypes().get(2)); + assertEquals(CompressionType.SNAPPY.id, response.data().acceptedCompressionTypes().get(3)); + assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, response.data().pushIntervalMs()); + assertTrue(response.data().deltaTemporality()); + assertEquals(100, response.data().telemetryMaxBytes()); + assertEquals(Errors.NONE, response.error()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(response.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testGetTelemetryDefaultTelemetryMaxBytes() throws UnknownHostException { + // Remove telemetry max bytes property, default value should be used. + props.remove(KafkaConfig.ClientTelemetryMaxBytesProp()); + config = new KafkaConfig(props); + clientMetricsManager = new ClientMetricsManager(config, time); + + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertNotNull(response.data().clientInstanceId()); + assertEquals(1024 * 1024, response.data().telemetryMaxBytes()); + assertEquals(Errors.NONE, response.error()); + } + + @Test + public void testGetTelemetryWithoutSubscription() throws UnknownHostException { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertNotNull(response.data().clientInstanceId()); + assertTrue(response.data().subscriptionId() != 0); + assertTrue(response.data().requestedMetrics().isEmpty()); + assertEquals(4, response.data().acceptedCompressionTypes().size()); + assertEquals(ClientMetricsConfigs.DEFAULT_INTERVAL_MS, response.data().pushIntervalMs()); + assertTrue(response.data().deltaTemporality()); + assertEquals(100, response.data().telemetryMaxBytes()); + assertEquals(Errors.NONE, response.error()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(response.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testGetTelemetryAfterPushIntervalTime() throws UnknownHostException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertNotNull(response.data().clientInstanceId()); + assertEquals(Errors.NONE, response.error()); + + time.setCurrentTimeMs(time.milliseconds() + ClientMetricsConfigs.DEFAULT_INTERVAL_MS); + + request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(response.data().clientInstanceId()), true).build(); + + response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + assertNotNull(response.data().clientInstanceId()); + assertEquals(Errors.NONE, response.error()); + } + + @Test + public void testGetTelemetryAllMetricSubscribedSubscription() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + Properties properties = new Properties(); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + clientMetricsManager.updateSubscription("sub-2", properties); + + assertEquals(2, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertNotNull(response.data().clientInstanceId()); + assertTrue(response.data().subscriptionId() != 0); + + assertEquals(1, response.data().requestedMetrics().size()); + assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG)); + + assertEquals(4, response.data().acceptedCompressionTypes().size()); + assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, response.data().pushIntervalMs()); + assertTrue(response.data().deltaTemporality()); + assertEquals(100, response.data().telemetryMaxBytes()); + assertEquals(Errors.NONE, response.error()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(response.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testGetTelemetrySameClientImmediateRetryFail() throws UnknownHostException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + Uuid clientInstanceId = response.data().clientInstanceId(); + assertNotNull(clientInstanceId); + assertEquals(Errors.NONE, response.error()); + + request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(); + response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, response.error()); + } + + @Test + public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws UnknownHostException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + Uuid clientInstanceId = response.data().clientInstanceId(); + assertNotNull(clientInstanceId); + assertEquals(Errors.NONE, response.error()); + + // Create new client metrics manager which simulates a new server as it will not have any + // last request information but request should succeed as subscription id should match + // the one with new client instance. + + ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(config, time); + + PushTelemetryRequest pushRequest = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(response.data().clientInstanceId()) + .setSubscriptionId(response.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build(); + + PushTelemetryResponse pushResponse = newClientMetricsManager.processPushTelemetryRequest( + pushRequest, ClientMetricsTestUtils.requestContext(), 0); + + assertEquals(Errors.NONE, pushResponse.error()); + + request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(); + + response = newClientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, response.error()); + } + + @Test + public void testGetTelemetryUpdateSubscription() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + Uuid clientInstanceId = response.data().clientInstanceId(); + int subscriptionId = response.data().subscriptionId(); + assertNotNull(clientInstanceId); + assertTrue(subscriptionId != 0); + assertEquals(Errors.NONE, response.error()); + + // Update subscription + Properties properties = new Properties(); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + clientMetricsManager.updateSubscription("sub-2", properties); + assertEquals(2, clientMetricsManager.subscriptions().size()); + + request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(); + + response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + // No throttle error as the subscription has changed. + assertEquals(Errors.NONE, response.error()); + // Subscription id updated in next request + assertTrue(subscriptionId != response.data().subscriptionId()); + } + + @Test + public void testGetTelemetryConcurrentRequestNewClientInstance() throws InterruptedException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid()), true).build(); + + CountDownLatch lock = new CountDownLatch(2); + List<GetTelemetrySubscriptionsResponse> responses = Collections.synchronizedList(new ArrayList<>()); + + Thread thread = new Thread(() -> { + try { + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + Thread thread1 = new Thread(() -> { + try { + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + thread.start(); + thread1.start(); + + assertTrue(lock.await(2000, TimeUnit.MILLISECONDS)); + assertEquals(2, responses.size()); + + int throttlingErrorCount = 0; + for (GetTelemetrySubscriptionsResponse response : responses) { + if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) { + throttlingErrorCount++; + } else { + // As subscription is updated hence 1 request shall fail with unknown subscription id. + assertEquals(Errors.NONE, response.error()); + } + } + // 1 request should fail with throttling error. + assertEquals(1, throttlingErrorCount); + } + + @Test + public void testGetTelemetryConcurrentRequestAfterSubscriptionUpdate() + throws InterruptedException, UnknownHostException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid()), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + CountDownLatch lock = new CountDownLatch(2); + List<GetTelemetrySubscriptionsResponse> responses = Collections.synchronizedList(new ArrayList<>()); + + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + Thread thread = new Thread(() -> { + try { + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + Thread thread1 = new Thread(() -> { + try { + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + thread.start(); + thread1.start(); + + assertTrue(lock.await(2000, TimeUnit.MILLISECONDS)); + assertEquals(2, responses.size()); + + int throttlingErrorCount = 0; + for (GetTelemetrySubscriptionsResponse response : responses) { + if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) { + throttlingErrorCount++; + } else { + // As subscription is updated hence 1 request shall fail with unknown subscription id. + assertEquals(Errors.NONE, response.error()); + } + } + // 1 request should fail with throttling error. + assertEquals(1, throttlingErrorCount); + } + + @Test + public void testPushTelemetry() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext(), 0); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + PushTelemetryRequest request = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertEquals(Errors.NONE, response.error()); + assertFalse(instance.terminating()); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testPushTelemetryOnNewServer() throws UnknownHostException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext(), 0); + + // Create new client metrics manager which simulates a new server as it will not have any + // client instance information but request should succeed as subscription id should match + // the one with new client instance. + + ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(config, time); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()), true).build(); + + PushTelemetryResponse response = newClientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertEquals(Errors.NONE, response.error()); + } + + @Test + public void testPushTelemetryAfterPushIntervalTime() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext(), 0); + + PushTelemetryRequest request = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext(), 0); + + assertEquals(Errors.NONE, response.error()); + + time.setCurrentTimeMs(time.milliseconds() + ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS); Review Comment: This could just be `time.sleep(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS)`. Ditto in a few other places. ########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; + + private ClientMetricsManager() { + this(Time.SYSTEM); + } + + // Visible for testing + ClientMetricsManager(Time time) { + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.time = time; + } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + this.subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(throttleMs, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, telemetryMaxBytes, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(throttleMs, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(throttleMs, Errors.INVALID_RECORD); + } + } + + clientInstance.lastKnownError(Errors.NONE); + return new PushTelemetryResponse(new PushTelemetryResponseData().setThrottleTimeMs(throttleMs)); } @Override public void close() throws IOException { - // TODO: Implement the close logic to close the client metrics manager. + subscriptionMap.clear(); + } + + private void updateClientSubscription(String subscriptionName, ClientMetricsConfigs configs) { + List<String> metrics = configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS); + int pushInterval = configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS); + List<String> clientMatchPattern = configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN); + + /* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. + */ + int version = this.subscriptionUpdateVersion.incrementAndGet(); + SubscriptionInfo newSubscription = + new SubscriptionInfo(subscriptionName, metrics, pushInterval, + ClientMetricsConfigs.parseMatchingPatterns(clientMatchPattern), version); + + subscriptionMap.put(subscriptionName, newSubscription); + } + + private Uuid generateNewClientId() { + Uuid id = Uuid.randomUuid(); + while (clientInstanceCache.get(id) != null) { + id = Uuid.randomUuid(); + } + return id; + } + + private ClientMetricsInstance clientInstance(Uuid clientInstanceId, RequestContext requestContext) { + ClientMetricsInstance clientInstance = clientInstanceCache.get(clientInstanceId); + + if (clientInstance == null) { + /* + If the client instance is not present in the cache, then create a new client instance + and update the cache. This can also happen when the telemetry request is received by + the separate broker instance. + Though cache is synchronized, but it is possible that concurrent calls can create the same + client instance. Hence, safeguard the client instance creation with a double-checked lock + to ensure that only one instance is created. + */ + synchronized (this) { + clientInstance = clientInstanceCache.get(clientInstanceId); + if (clientInstance != null) { + return clientInstance; + } + + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata( + clientInstanceId, requestContext); + clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata); + } + } else if (clientInstance.subscriptionVersion() < subscriptionUpdateVersion.get()) { + /* + If the last subscription update version for client instance is older than the subscription + updated version, then re-evaluate the subscription information for the client as per the + updated subscriptions. This is to ensure that the client instance is always in sync with + the latest subscription information. + Though cache is synchronized, but it is possible that concurrent calls can create the same + client instance. Hence, safeguard the client instance update with a double-checked lock + to ensure that only one instance is created. + */ + synchronized (this) { + clientInstance = clientInstanceCache.get(clientInstanceId); + if (clientInstance.subscriptionVersion() >= subscriptionUpdateVersion.get()) { + return clientInstance; + } + clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, clientInstance.instanceMetadata()); + } + } + + return clientInstance; + } + + private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInstanceId, + ClientMetricsInstanceMetadata instanceMetadata) { + + ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); + clientInstanceCache.put(clientInstanceId, clientInstance); + return clientInstance; + } + + private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, ClientMetricsInstanceMetadata instanceMetadata) { + + int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS; + // Keep a set of metrics to avoid duplicates in case of overlapping subscriptions. + Set<String> subscribedMetrics = new HashSet<>(); + boolean allMetricsSubscribed = false; + + int currentSubscriptionVersion = 0; + for (SubscriptionInfo info : subscriptionMap.values()) { + if (instanceMetadata.isMatch(info.matchPattern())) { + allMetricsSubscribed = allMetricsSubscribed || info.metrics().contains( + ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + subscribedMetrics.addAll(info.metrics()); + pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs()); + currentSubscriptionVersion = Math.max(currentSubscriptionVersion, info.version()); Review Comment: Hmm, if not all subscriptions match a client, `currentSubscriptionVersion` will always be below the global `subscriptionUpdateVersion` and it forces a new client instance to be recreated on every request, right? Could we just use the global `subscriptionUpdateVersion`? ########## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + + private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: For `ClientMetricsReceiverPlugin`, it seems that you could do the same thing by creating an instance in `BrokerServer` and pass it along to `DynamicBrokerConfig`. ########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); - private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + private final KafkaConfig config; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; - public static ClientMetricsManager instance() { - return INSTANCE; + public ClientMetricsManager(KafkaConfig config, Time time) { + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.config = config; + this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + this.subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, RequestContext requestContext, int throttleMs) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(0, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(0, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(0, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(throttleMs, Errors.INVALID_RECORD); Review Comment: `throttleMs` should be 0. ########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); - private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + private final KafkaConfig config; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; - public static ClientMetricsManager instance() { - return INSTANCE; + public ClientMetricsManager(KafkaConfig config, Time time) { + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.config = config; + this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + this.subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, RequestContext requestContext, int throttleMs) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(0, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(0, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(0, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(throttleMs, Errors.INVALID_RECORD); + } + } + + clientInstance.lastKnownError(Errors.NONE); + return new PushTelemetryResponse(new PushTelemetryResponseData().setThrottleTimeMs(throttleMs)); } @Override public void close() throws IOException { - // TODO: Implement the close logic to close the client metrics manager. + subscriptionMap.clear(); + } + + private void updateClientSubscription(String subscriptionName, ClientMetricsConfigs configs) { + List<String> metrics = configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS); + int pushInterval = configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS); + List<String> clientMatchPattern = configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN); + + /* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. + */ + int version = this.subscriptionUpdateVersion.incrementAndGet(); + SubscriptionInfo newSubscription = + new SubscriptionInfo(subscriptionName, metrics, pushInterval, + ClientMetricsConfigs.parseMatchingPatterns(clientMatchPattern), version); + + subscriptionMap.put(subscriptionName, newSubscription); + } + + private Uuid generateNewClientId() { + Uuid id = Uuid.randomUuid(); + while (clientInstanceCache.get(id) != null) { + id = Uuid.randomUuid(); + } + return id; + } + + private ClientMetricsInstance clientInstance(Uuid clientInstanceId, RequestContext requestContext) { + ClientMetricsInstance clientInstance = clientInstanceCache.get(clientInstanceId); + + if (clientInstance == null) { + /* + If the client instance is not present in the cache, then create a new client instance + and update the cache. This can also happen when the telemetry request is received by + the separate broker instance. + Though cache is synchronized, but it is possible that concurrent calls can create the same + client instance. Hence, safeguard the client instance creation with a double-checked lock + to ensure that only one instance is created. + */ + synchronized (this) { + clientInstance = clientInstanceCache.get(clientInstanceId); + if (clientInstance != null) { + return clientInstance; + } + + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata( + clientInstanceId, requestContext); + clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata); + } + } else if (clientInstance.subscriptionVersion() < subscriptionUpdateVersion.get()) { + /* + If the last subscription update version for client instance is older than the subscription + updated version, then re-evaluate the subscription information for the client as per the + updated subscriptions. This is to ensure that the client instance is always in sync with + the latest subscription information. + Though cache is synchronized, but it is possible that concurrent calls can create the same + client instance. Hence, safeguard the client instance update with a double-checked lock + to ensure that only one instance is created. + */ + synchronized (this) { + clientInstance = clientInstanceCache.get(clientInstanceId); + if (clientInstance.subscriptionVersion() >= subscriptionUpdateVersion.get()) { + return clientInstance; + } + clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, clientInstance.instanceMetadata()); + } + } + + return clientInstance; + } + + private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInstanceId, + ClientMetricsInstanceMetadata instanceMetadata) { + + ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); + clientInstanceCache.put(clientInstanceId, clientInstance); + return clientInstance; + } + + private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, ClientMetricsInstanceMetadata instanceMetadata) { + + int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS; + // Keep a set of metrics to avoid duplicates in case of overlapping subscriptions. + Set<String> subscribedMetrics = new HashSet<>(); + boolean allMetricsSubscribed = false; + + int currentSubscriptionVersion = 0; + for (SubscriptionInfo info : subscriptionMap.values()) { + if (instanceMetadata.isMatch(info.matchPattern())) { + allMetricsSubscribed = allMetricsSubscribed || info.metrics().contains( + ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + subscribedMetrics.addAll(info.metrics()); + pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs()); + currentSubscriptionVersion = Math.max(currentSubscriptionVersion, info.version()); + } + } + + /* + If client matches with any subscription that has * metrics string, then it means that client + is subscribed to all the metrics, so just send the * string as the subscribed metrics. + */ + if (allMetricsSubscribed) { + // Only add an * to indicate that all metrics are subscribed. + subscribedMetrics.clear(); + subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + } + + int subscriptionId = computeSubscriptionId(subscribedMetrics, pushIntervalMs, clientInstanceId); + + return new ClientMetricsInstance(clientInstanceId, instanceMetadata, subscriptionId, + currentSubscriptionVersion, subscribedMetrics, pushIntervalMs); + } + + /** + * Computes the SubscriptionId as a unique identifier for a client instance's subscription set, + * the id is generated by calculating a CRC32 of the configured metrics subscriptions including Review Comment: CRC32 => CRC32C ########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); - private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + private final KafkaConfig config; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; - public static ClientMetricsManager instance() { - return INSTANCE; + public ClientMetricsManager(KafkaConfig config, Time time) { + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.config = config; + this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + this.subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, RequestContext requestContext, int throttleMs) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(0, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(0, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(0, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(throttleMs, Errors.INVALID_RECORD); + } + } + + clientInstance.lastKnownError(Errors.NONE); + return new PushTelemetryResponse(new PushTelemetryResponseData().setThrottleTimeMs(throttleMs)); Review Comment: As discussed, we shouldn't set `throttleMs` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org