Izeren commented on code in PR #28427:
URL: https://github.com/apache/flink/pull/28427#discussion_r3459152569
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -316,6 +318,53 @@ public static List<FileSystemFactory>
getRegisteredFileSystemFactories() {
}
}
+ /**
+ * Hands a runtime-owned, process-level {@link MetricGroup} to every
registered {@link
+ * FileSystemFactory} that opts into metrics via {@link MetricsAware}.
+ *
+ * <p>This is the second phase of file system initialization. {@link
#initialize(Configuration,
+ * PluginManager)} runs at process startup, before the {@code
MetricRegistry} exists; this
+ * method is therefore invoked separately, once the registry and a
process-level {@link
+ * MetricGroup} are available. It is called from the TaskManager and
JobManager entrypoints
+ * only. Contexts without a process-level {@link MetricGroup} (CLI,
HistoryServer, YARN client)
+ * simply never call it, and their file system plugins continue to operate
without emitting
+ * metrics.
+ *
+ * <p>The call is idempotent: factories receive a child group {@code
<process>.filesystem}, and
+ * {@link MetricGroup#addGroup} returns the same child on repeated calls
with the same parent,
+ * so re-invocation does not register duplicate metrics. Factories that do
not implement {@link
Review Comment:
Is it going to spam logs for "duplicate metric groups" even more than now?
##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
Review Comment:
!nit, one more bloated explanation, which could be done directly on the test
case itself, why do we want to put it in the class docs?
##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
+ * file systems are registered wrapped in a {@link PluginFileSystemFactory},
which does <em>not</em>
+ * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap
the proxy to reach the
+ * real factory, otherwise the metric group is silently never delivered and no
metrics are ever
+ * emitted.
+ */
+class FileSystemAttachMetricsTest {
+
+ @AfterEach
+ void resetFileSystems() {
+ // Restore the default, plugin-less factory registry so other tests
are unaffected.
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @Test
+ void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
+ RecordingMetricsAwareFactory factory = new
RecordingMetricsAwareFactory("metrics-test-fs");
+ initializeWithPlugins(factory);
+
+ RecordingMetricGroup processGroup = new RecordingMetricGroup();
+ FileSystem.attachMetrics(processGroup);
+
+ // Unwrapped through PluginFileSystemFactory and invoked exactly once.
+ assertThat(factory.setMetricGroupCalls).hasValue(1);
+ // The group handed to the factory is the "filesystem" child of the
process group, not the
+ // process group itself.
+ assertThat(processGroup.childGroupNames).containsExactly("filesystem");
+
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
+ }
+
+ @Test
+ void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {
+ initializeWithPlugins(new PlainFactory("plain-test-fs"));
+
+ assertThatCode(() -> FileSystem.attachMetrics(new
UnregisteredMetricsGroup()))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void attachMetricsIsResilientToAFactoryThatThrows() {
+ RecordingMetricsAwareFactory ok = new
RecordingMetricsAwareFactory("ok-test-fs");
+ initializeWithPlugins(new
ThrowingMetricsAwareFactory("throwing-test-fs"), ok);
+
+ // A misbehaving plugin must never break process startup, and
well-behaved factories must
+ // still receive their group regardless of iteration order.
+ assertThatCode(() -> FileSystem.attachMetrics(new
UnregisteredMetricsGroup()))
+ .doesNotThrowAnyException();
+ assertThat(ok.setMetricGroupCalls).hasValue(1);
+ }
+
+ @Test
+ void attachMetricsDoesNotThrowWhenInvokedRepeatedly() {
+ RecordingMetricsAwareFactory factory = new
RecordingMetricsAwareFactory("idem-test-fs");
+ initializeWithPlugins(factory);
+
+ MetricGroup group = new UnregisteredMetricsGroup();
+ assertThatCode(
+ () -> {
+ FileSystem.attachMetrics(group);
+ FileSystem.attachMetrics(group);
+ })
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void setMetricGroupIsNotInvokedWhenAttachMetricsIsNeverCalled() {
+ RecordingMetricsAwareFactory factory = new
RecordingMetricsAwareFactory("never-test-fs");
+ initializeWithPlugins(factory);
+
+ assertThat(factory.setMetricGroupCalls).hasValue(0);
+ }
+
+ @Test
+ void pluginFileSystemFactoryForwardsMetricGroupToInner() {
+ RecordingMetricsAwareFactory inner = new
RecordingMetricsAwareFactory("wrapped-fs");
+ FileSystemFactory wrapper = PluginFileSystemFactory.of(inner);
+
+ // The wrapper must itself be MetricsAware so attachMetrics reaches it
without unwrapping.
+ assertThat(wrapper).isInstanceOf(MetricsAware.class);
+
+ MetricGroup group = new UnregisteredMetricsGroup();
+ ((MetricsAware) wrapper).setMetricGroup(group);
+
+ assertThat(inner.setMetricGroupCalls).hasValue(1);
+ assertThat(inner.receivedGroup.get()).isSameAs(group);
+ }
+
+ private static void initializeWithPlugins(FileSystemFactory... factories) {
+ Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+ plugins.put(FileSystemFactory.class,
Arrays.asList(factories).iterator());
+ FileSystem.initialize(new Configuration(), new
TestingPluginManager(plugins));
+ }
+
+ // ------------------------------------------------------------------------
+ // test factories
+ // ------------------------------------------------------------------------
+
+ private static class PlainFactory implements FileSystemFactory {
Review Comment:
Could we deduplicate the code here? I feel like single
TestingFileSystemFactory could do all things together and take inline hook for
the behaviour override which would make it more clear in tests
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ * <li>{@code api_call_count} (Counter) — labels {@code op}, {@code
status_class}
+ * <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ * <li>{@code throttle_count} (Counter) — label {@code op}
+ * <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+ static final String API_CALL_COUNT = "api_call_count";
Review Comment:
Are these constants expected to be universal across cloud providers? If so,
should we extract them to the common module like the interface?
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -321,9 +325,53 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
+ "When not set, the default chain is
used: delegation tokens -> "
+ "static credentials (if configured) ->
DefaultCredentialsProvider.");
+ public static final ConfigOption<Boolean> METRICS_ENABLED =
+ ConfigOptions.key("s3.metrics.enabled")
Review Comment:
Is the suggestion to have these properties replicated across all clouds? I
am curious if it would be more generic to move them higher up and parametrise
by <scheme> as we do for some other per-cloud configs
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ * <li>{@code api_call_count} (Counter) — labels {@code op}, {@code
status_class}
+ * <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ * <li>{@code throttle_count} (Counter) — label {@code op}
+ * <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+ static final String API_CALL_COUNT = "api_call_count";
+ static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+ static final String THROTTLE_COUNT = "throttle_count";
+ static final String RETRY_COUNT = "retry_count";
+ static final String IOPS = "iops";
+
+ /** The default-on metric set from FLIP-576. {@code iops} is derived, not
registered. */
+ static final List<String> DEFAULT_ALLOWLIST =
+ Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS,
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+ private static final String WILDCARD = "*";
+
+ private static final String LABEL_OP = "op";
+ private static final String LABEL_STATUS_CLASS = "status_class";
+ private static final String LABEL_REASON = "reason";
+
+ private static final String UNKNOWN_OP = "Unknown";
+
+ private final MetricGroup fsScope;
+ private final int histogramWindowSize;
+
+ private final boolean allowAll;
+ private final Set<String> allowlist;
+
+ // op and label sets are closed, so these maps are bounded by construction.
+ private final ConcurrentHashMap<String, Counter> counters = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, S3MetricHistogram> histograms =
+ new ConcurrentHashMap<>();
+
+ public AwsSdkMetricBridge(MetricGroup fsScope) {
+ this(fsScope, DEFAULT_ALLOWLIST,
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+ }
+
+ public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) {
+ this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize);
+ }
+
+ public AwsSdkMetricBridge(
+ MetricGroup fsScope, @Nullable Collection<String> allowlist, int
histogramWindowSize) {
+ this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not
be null");
+ Preconditions.checkArgument(
+ histogramWindowSize > 0, "histogramWindowSize must be
positive");
+ this.histogramWindowSize = histogramWindowSize;
+
+ if (allowlist == null || allowlist.isEmpty()) {
+ LOG.warn(
+ "S3 metrics allowlist is empty; falling back to the
default metric set {}",
+ DEFAULT_ALLOWLIST);
+ this.allowAll = false;
+ this.allowlist = new HashSet<>(DEFAULT_ALLOWLIST);
+ } else if (allowlist.contains(WILDCARD)) {
+ this.allowAll = true;
+ this.allowlist = new HashSet<>();
+ } else {
+ this.allowAll = false;
+ this.allowlist = new HashSet<>(allowlist);
+ }
+ }
+
+ private boolean allowed(String metricName) {
+ return allowAll || allowlist.contains(metricName);
+ }
+
+ @Override
+ public void publish(MetricCollection apiCall) {
+ try {
+ translate(apiCall);
+ } catch (Throwable t) {
+ // Defence in depth: a metric failure must never affect S3 IO.
+ LOG.debug("Failed to publish S3 SDK metrics", t);
+ }
+ }
+
+ private void translate(MetricCollection apiCall) {
+ final String op = first(apiCall, CoreMetric.OPERATION_NAME,
UNKNOWN_OP);
+
+ final Duration duration = first(apiCall, CoreMetric.API_CALL_DURATION,
null);
+ if (duration != null && allowed(API_CALL_DURATION_MS)) {
+ histogram(op).update(duration.toMillis());
+ }
+
+ // HTTP_STATUS_CODE lives on the per-attempt children, not on the
top-level ApiCall record.
+ // status_class reflects the overall outcome (last attempt); the retry
reason reflects the
+ // failures that triggered the retries (any attempt), so they are
tracked separately.
+ int throttleResponses = 0;
+ boolean sawServerError = false;
+ Integer lastStatus = null;
+ for (MetricCollection attempt : apiCall.children()) {
+ for (Integer status :
attempt.metricValues(HttpMetric.HTTP_STATUS_CODE)) {
+ if (status != null) {
+ lastStatus = status;
+ if (isThrottle(status)) {
+ throttleResponses++;
+ } else if (status >= 500) {
+ sawServerError = true;
+ }
+ }
+ }
+ }
+
+ final Boolean successful = first(apiCall,
CoreMetric.API_CALL_SUCCESSFUL, null);
+ if (allowed(API_CALL_COUNT)) {
+ apiCallCount(op, statusClass(lastStatus, successful)).inc();
+ }
+
+ if (throttleResponses > 0 && allowed(THROTTLE_COUNT)) {
+ throttleCount(op).inc(throttleResponses);
+ }
+
+ final Integer retries = first(apiCall, CoreMetric.RETRY_COUNT, 0);
+ if (retries != null && retries > 0 && allowed(RETRY_COUNT)) {
+ retryCount(op, retryReason(throttleResponses > 0,
sawServerError)).inc(retries);
+ }
+ }
+
+ private static boolean isThrottle(int status) {
+ return status == 429 || status == 503;
+ }
+
+ private static String statusClass(Integer status, Boolean successful) {
+ if (status == null) {
+ if (Boolean.TRUE.equals(successful)) {
+ return "2xx";
+ }
+ return Boolean.FALSE.equals(successful) ? "error" : "unknown";
+ }
+ if (isThrottle(status)) {
+ return "throttled";
+ }
+ if (status >= 200 && status < 300) {
+ return "2xx";
+ }
+ if (status >= 400 && status < 500) {
+ return "4xx";
+ }
+ if (status >= 500) {
+ return "5xx";
+ }
+ return "other";
+ }
+
+ private static String retryReason(boolean sawThrottle, boolean
sawServerError) {
Review Comment:
Should/can we extract this as cloud agnostic?
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ * <li>{@code api_call_count} (Counter) — labels {@code op}, {@code
status_class}
+ * <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ * <li>{@code throttle_count} (Counter) — label {@code op}
+ * <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+ static final String API_CALL_COUNT = "api_call_count";
+ static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+ static final String THROTTLE_COUNT = "throttle_count";
+ static final String RETRY_COUNT = "retry_count";
+ static final String IOPS = "iops";
+
+ /** The default-on metric set from FLIP-576. {@code iops} is derived, not
registered. */
+ static final List<String> DEFAULT_ALLOWLIST =
+ Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS,
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+ private static final String WILDCARD = "*";
+
+ private static final String LABEL_OP = "op";
+ private static final String LABEL_STATUS_CLASS = "status_class";
+ private static final String LABEL_REASON = "reason";
+
+ private static final String UNKNOWN_OP = "Unknown";
+
+ private final MetricGroup fsScope;
+ private final int histogramWindowSize;
+
+ private final boolean allowAll;
+ private final Set<String> allowlist;
+
+ // op and label sets are closed, so these maps are bounded by construction.
+ private final ConcurrentHashMap<String, Counter> counters = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, S3MetricHistogram> histograms =
+ new ConcurrentHashMap<>();
+
+ public AwsSdkMetricBridge(MetricGroup fsScope) {
+ this(fsScope, DEFAULT_ALLOWLIST,
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+ }
+
+ public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) {
+ this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize);
+ }
+
+ public AwsSdkMetricBridge(
+ MetricGroup fsScope, @Nullable Collection<String> allowlist, int
histogramWindowSize) {
+ this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not
be null");
+ Preconditions.checkArgument(
+ histogramWindowSize > 0, "histogramWindowSize must be
positive");
+ this.histogramWindowSize = histogramWindowSize;
+
+ if (allowlist == null || allowlist.isEmpty()) {
+ LOG.warn(
+ "S3 metrics allowlist is empty; falling back to the
default metric set {}",
+ DEFAULT_ALLOWLIST);
+ this.allowAll = false;
+ this.allowlist = new HashSet<>(DEFAULT_ALLOWLIST);
+ } else if (allowlist.contains(WILDCARD)) {
+ this.allowAll = true;
+ this.allowlist = new HashSet<>();
+ } else {
+ this.allowAll = false;
+ this.allowlist = new HashSet<>(allowlist);
+ }
+ }
+
+ private boolean allowed(String metricName) {
+ return allowAll || allowlist.contains(metricName);
+ }
+
+ @Override
+ public void publish(MetricCollection apiCall) {
+ try {
+ translate(apiCall);
+ } catch (Throwable t) {
Review Comment:
Do we really want to catch throwable here?
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ * <li>{@code api_call_count} (Counter) — labels {@code op}, {@code
status_class}
+ * <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ * <li>{@code throttle_count} (Counter) — label {@code op}
+ * <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+ static final String API_CALL_COUNT = "api_call_count";
+ static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+ static final String THROTTLE_COUNT = "throttle_count";
+ static final String RETRY_COUNT = "retry_count";
+ static final String IOPS = "iops";
+
+ /** The default-on metric set from FLIP-576. {@code iops} is derived, not
registered. */
+ static final List<String> DEFAULT_ALLOWLIST =
+ Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS,
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+ private static final String WILDCARD = "*";
+
+ private static final String LABEL_OP = "op";
+ private static final String LABEL_STATUS_CLASS = "status_class";
+ private static final String LABEL_REASON = "reason";
+
+ private static final String UNKNOWN_OP = "Unknown";
+
+ private final MetricGroup fsScope;
+ private final int histogramWindowSize;
+
+ private final boolean allowAll;
+ private final Set<String> allowlist;
+
+ // op and label sets are closed, so these maps are bounded by construction.
+ private final ConcurrentHashMap<String, Counter> counters = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, S3MetricHistogram> histograms =
+ new ConcurrentHashMap<>();
+
+ public AwsSdkMetricBridge(MetricGroup fsScope) {
+ this(fsScope, DEFAULT_ALLOWLIST,
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+ }
+
+ public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) {
+ this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize);
+ }
+
+ public AwsSdkMetricBridge(
+ MetricGroup fsScope, @Nullable Collection<String> allowlist, int
histogramWindowSize) {
+ this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not
be null");
+ Preconditions.checkArgument(
+ histogramWindowSize > 0, "histogramWindowSize must be
positive");
+ this.histogramWindowSize = histogramWindowSize;
+
+ if (allowlist == null || allowlist.isEmpty()) {
Review Comment:
I feel like allowlist could be implemented at the config options level with
the `default` resolved on config read. Then you don't have to handle fallbacks
in the code.
Also, a similar question as before, do we want to scope this down to the AWS
sdk bridge or are these configs universal enough to be extracted?
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+ @Test
+ void successfulCallIncrementsApiCallCountAndRecordsDuration() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0,
200));
+
+
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+ Histogram histogram =
root.histograms.get("op=PutObject/api_call_duration_ms");
+ assertThat(histogram).isNotNull();
+ assertThat(histogram.getCount()).isEqualTo(1L);
+ assertThat(histogram.getStatistics().getMax()).isEqualTo(120L);
+
assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count");
+ }
+
+ @Test
+ void throttledCallIncrementsThrottleAndRetryCounts() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ // Two throttled attempts (503) followed by a successful one (200);
RETRY_COUNT = 2.
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+ assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+ // The final attempt succeeded, so the overall call is classified 2xx.
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+ }
+
+ @Test
+ void clientErrorIsClassifiedAs4xx() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0,
404));
+
+
assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count");
+ }
+
+ @Test
+ void allowlistRegistersOnlyTheListedMetrics() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ // Only api_call_count is allowed; duration, throttle and retry must
be skipped.
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root,
+
Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT),
+ S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms");
+
assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count");
+
assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count");
+ }
+
+ @Test
+ void wildcardAllowlistRegistersEveryMetric() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root,
+ Collections.singletonList("*"),
+ S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms.get("op=UploadPart/api_call_duration_ms")).isNotNull();
+ assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+ }
+
+ @Test
+ void emptyAllowlistFallsBackToDefaults() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root, Collections.emptyList(),
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0,
200));
+
+ // The five default metrics include api_call_count and
api_call_duration_ms.
+
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms.get("op=PutObject/api_call_duration_ms")).isNotNull();
+ }
+
+ @Test
+ void serverErrorRetryIsClassifiedAs5xx() {
Review Comment:
I would be very explicit that this is AWS specific behaviour, ideally I
would abstract error classification and do it somewhat outside of the bridge.
Bridge should only map exact response to abstract error class that can be
re-used for any cloud. Throttling is not AWS specific
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.fs.s3native.NativeS3FileSystemFactory;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end test proving that real S3 operations performed through {@code
NativeS3FileSystem} are
+ * translated into Flink metrics by {@link AwsSdkMetricBridge} and become
visible in a real Flink
+ * metric registry.
+ *
+ * <p>Unlike {@link AwsSdkMetricBridgeTest} (which drives the bridge with
synthesized SDK records
+ * and a fake {@code MetricGroup}), this test exercises the full chain: the
AWS SDK actually invokes
+ * the registered {@link software.amazon.awssdk.metrics.MetricPublisher} after
each completed API
+ * call, the bridge registers and updates {@link Counter}/{@link Histogram}
handles, and the
+ * assertions read those handles back through {@link MetricListener}'s real
{@code MetricRegistry}.
+ *
+ * <p>Assertions use only GET/HEAD/LIST round trips, which carry no request
body and are therefore
+ * unaffected by the request-checksum behaviour newer AWS SDK versions apply
to {@code PutObject}.
+ *
+ * <p>Requires Docker; auto-skipped when Docker is unavailable.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+class NativeS3MetricsEmissionITCase {
+
+ private static final String MINIO_IMAGE =
"minio/minio:RELEASE.2022-02-07T08-17-33Z";
Review Comment:
Why do we hardcode this image here? Do we have more centralised way to
manage minio tests?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java:
##########
@@ -377,6 +377,33 @@ protected void initializeServices(Configuration
configuration, PluginManager plu
configuration.set(JobManagerOptions.ADDRESS,
commonRpcService.getAddress());
configuration.set(JobManagerOptions.PORT,
commonRpcService.getPort());
+ metricRegistry = createMetricRegistry(configuration,
pluginManager, rpcSystem);
+
+ final RpcService metricQueryServiceRpcService =
+ MetricUtils.startRemoteMetricsRpcService(
+ configuration,
+ commonRpcService.getAddress(),
+ configuration.get(JobManagerOptions.BIND_HOST),
+ rpcSystem);
+ metricRegistry.startQueryService(metricQueryServiceRpcService,
null);
+
+ final String hostname = RpcUtils.getHostname(commonRpcService);
+
+ processMetricGroup =
+ MetricUtils.instantiateProcessMetricGroup(
+ metricRegistry,
+ hostname,
+
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
+ configuration));
+
+ // Second-phase init for file system plugins that opt into metrics
(e.g.
+ // flink-s3-fs-native): hand them the process-level metric group
before any file system
+ // is used. This must run ahead of the HA services and BlobServer
below, because those
+ // may open external file systems (e.g. S3 HA/blob storage),
creating them first would
+ // cache metric-less file system clients for the rest of the
process lifetime. See
Review Comment:
Is the problem here that FS doesn't have it's own `close` hook? If so, I
think it is FS bug that we should fix for many reasons to allow all FS related
resources be freed as part of FS shutdown. Also, FS lifecycle is typically the
same as the whole TM/JM VM lifecycle. Could you please explain what are we
trying to achieve?
##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
+ * file systems are registered wrapped in a {@link PluginFileSystemFactory},
which does <em>not</em>
+ * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap
the proxy to reach the
+ * real factory, otherwise the metric group is silently never delivered and no
metrics are ever
+ * emitted.
+ */
+class FileSystemAttachMetricsTest {
+
+ @AfterEach
+ void resetFileSystems() {
+ // Restore the default, plugin-less factory registry so other tests
are unaffected.
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @Test
+ void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
+ RecordingMetricsAwareFactory factory = new
RecordingMetricsAwareFactory("metrics-test-fs");
+ initializeWithPlugins(factory);
+
+ RecordingMetricGroup processGroup = new RecordingMetricGroup();
+ FileSystem.attachMetrics(processGroup);
+
+ // Unwrapped through PluginFileSystemFactory and invoked exactly once.
+ assertThat(factory.setMetricGroupCalls).hasValue(1);
+ // The group handed to the factory is the "filesystem" child of the
process group, not the
+ // process group itself.
+ assertThat(processGroup.childGroupNames).containsExactly("filesystem");
+
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
+ }
+
+ @Test
+ void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {
+ initializeWithPlugins(new PlainFactory("plain-test-fs"));
+
+ assertThatCode(() -> FileSystem.attachMetrics(new
UnregisteredMetricsGroup()))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void attachMetricsIsResilientToAFactoryThatThrows() {
+ RecordingMetricsAwareFactory ok = new
RecordingMetricsAwareFactory("ok-test-fs");
+ initializeWithPlugins(new
ThrowingMetricsAwareFactory("throwing-test-fs"), ok);
+
+ // A misbehaving plugin must never break process startup, and
well-behaved factories must
+ // still receive their group regardless of iteration order.
+ assertThatCode(() -> FileSystem.attachMetrics(new
UnregisteredMetricsGroup()))
+ .doesNotThrowAnyException();
+ assertThat(ok.setMetricGroupCalls).hasValue(1);
Review Comment:
Do we have a test that would show `2` for multiple successful attachments?
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -316,6 +318,53 @@ public static List<FileSystemFactory>
getRegisteredFileSystemFactories() {
}
}
+ /**
+ * Hands a runtime-owned, process-level {@link MetricGroup} to every
registered {@link
+ * FileSystemFactory} that opts into metrics via {@link MetricsAware}.
+ *
+ * <p>This is the second phase of file system initialization. {@link
#initialize(Configuration,
+ * PluginManager)} runs at process startup, before the {@code
MetricRegistry} exists; this
+ * method is therefore invoked separately, once the registry and a
process-level {@link
+ * MetricGroup} are available. It is called from the TaskManager and
JobManager entrypoints
+ * only. Contexts without a process-level {@link MetricGroup} (CLI,
HistoryServer, YARN client)
+ * simply never call it, and their file system plugins continue to operate
without emitting
+ * metrics.
+ *
+ * <p>The call is idempotent: factories receive a child group {@code
<process>.filesystem}, and
+ * {@link MetricGroup#addGroup} returns the same child on repeated calls
with the same parent,
+ * so re-invocation does not register duplicate metrics. Factories that do
not implement {@link
+ * MetricsAware} are skipped.
+ *
+ * @param processMetricGroup the process-level metric group to register
file system metrics
+ * under.
+ */
+ @Internal
+ public static void attachMetrics(MetricGroup processMetricGroup) {
+ checkNotNull(processMetricGroup, "processMetricGroup");
+ LOCK.lock();
+ try {
+ final MetricGroup fsGroup =
processMetricGroup.addGroup("filesystem");
+ for (FileSystemFactory factory : FS_FACTORIES.values()) {
+ // Plugin-loaded factories are wrapped in a
PluginFileSystemFactory, which is itself
+ // MetricsAware and forwards setMetricGroup to the inner
factory under the plugin
+ // classloader, so this plain instanceof reaches both wrapped
and direct factories.
+ if (factory instanceof MetricsAware) {
+ try {
+ ((MetricsAware) factory).setMetricGroup(fsGroup);
+ } catch (Throwable t) {
Review Comment:
Do we really want to catch everything including OOMs here? That seems
unnecessarily broad
##########
flink-core/src/main/java/org/apache/flink/core/plugin/MetricsAware.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Capability marker for {@link Plugin}s that want to register Flink metrics.
+ *
+ * <p>This is an opt-in extension to the plugin SPI. A plugin declares {@code
implements
+ * SomePluginSpi, MetricsAware}; plugins that do not implement it are
byte-for-byte unchanged and
+ * emit no metrics.
+ *
+ * <p><b>Two-phase init contract.</b> The runtime invokes {@link
#setMetricGroup(MetricGroup)} after
+ * {@link Plugin#configure(org.apache.flink.configuration.Configuration)} and
before any operation
+ * that would emit a metric. The call happens only from runtime entrypoints
that own a process-level
+ * {@link MetricGroup} (TaskManager and JobManager), via {@link
+ * org.apache.flink.core.fs.FileSystem#attachMetrics(MetricGroup)}. Contexts
without such a group
+ * (CLI, HistoryServer, YARN client, embedded usage) never call it, in which
case the plugin must
+ * continue to operate normally and emit no metrics.
+ *
+ * <p><b>Idempotency.</b> {@code setMetricGroup} may be invoked more than once
(for example if
+ * metrics are re-attached). Implementations must treat repeated invocations
idempotently: a call
+ * with the same {@link MetricGroup} must not register duplicate metrics, and
a call with a
Review Comment:
What deduplicates metric groups for different plugins if they all register
it under `filesystem` name?
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -316,6 +318,53 @@ public static List<FileSystemFactory>
getRegisteredFileSystemFactories() {
}
}
+ /**
+ * Hands a runtime-owned, process-level {@link MetricGroup} to every
registered {@link
+ * FileSystemFactory} that opts into metrics via {@link MetricsAware}.
+ *
+ * <p>This is the second phase of file system initialization. {@link
#initialize(Configuration,
+ * PluginManager)} runs at process startup, before the {@code
MetricRegistry} exists; this
+ * method is therefore invoked separately, once the registry and a
process-level {@link
+ * MetricGroup} are available. It is called from the TaskManager and
JobManager entrypoints
+ * only. Contexts without a process-level {@link MetricGroup} (CLI,
HistoryServer, YARN client)
+ * simply never call it, and their file system plugins continue to operate
without emitting
+ * metrics.
+ *
+ * <p>The call is idempotent: factories receive a child group {@code
<process>.filesystem}, and
+ * {@link MetricGroup#addGroup} returns the same child on repeated calls
with the same parent,
+ * so re-invocation does not register duplicate metrics. Factories that do
not implement {@link
+ * MetricsAware} are skipped.
+ *
+ * @param processMetricGroup the process-level metric group to register
file system metrics
+ * under.
+ */
+ @Internal
+ public static void attachMetrics(MetricGroup processMetricGroup) {
+ checkNotNull(processMetricGroup, "processMetricGroup");
+ LOCK.lock();
+ try {
+ final MetricGroup fsGroup =
processMetricGroup.addGroup("filesystem");
Review Comment:
Hypothetically, can it result in group clash if we have different FS
instances with telemetry enabled at the same time?
##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
+ * file systems are registered wrapped in a {@link PluginFileSystemFactory},
which does <em>not</em>
+ * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap
the proxy to reach the
+ * real factory, otherwise the metric group is silently never delivered and no
metrics are ever
+ * emitted.
+ */
+class FileSystemAttachMetricsTest {
+
+ @AfterEach
+ void resetFileSystems() {
+ // Restore the default, plugin-less factory registry so other tests
are unaffected.
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @Test
+ void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
+ RecordingMetricsAwareFactory factory = new
RecordingMetricsAwareFactory("metrics-test-fs");
+ initializeWithPlugins(factory);
+
+ RecordingMetricGroup processGroup = new RecordingMetricGroup();
+ FileSystem.attachMetrics(processGroup);
+
+ // Unwrapped through PluginFileSystemFactory and invoked exactly once.
+ assertThat(factory.setMetricGroupCalls).hasValue(1);
+ // The group handed to the factory is the "filesystem" child of the
process group, not the
+ // process group itself.
+ assertThat(processGroup.childGroupNames).containsExactly("filesystem");
+
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
+ }
+
+ @Test
+ void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {
Review Comment:
This is misleading as we are talking about `factories` while testing only
plain factory. It should either be parametrised or have more explicit naming +
explanation why testing plain only is enough
##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
+ * file systems are registered wrapped in a {@link PluginFileSystemFactory},
which does <em>not</em>
+ * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap
the proxy to reach the
+ * real factory, otherwise the metric group is silently never delivered and no
metrics are ever
+ * emitted.
+ */
+class FileSystemAttachMetricsTest {
+
+ @AfterEach
+ void resetFileSystems() {
+ // Restore the default, plugin-less factory registry so other tests
are unaffected.
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @Test
+ void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
+ RecordingMetricsAwareFactory factory = new
RecordingMetricsAwareFactory("metrics-test-fs");
+ initializeWithPlugins(factory);
+
+ RecordingMetricGroup processGroup = new RecordingMetricGroup();
+ FileSystem.attachMetrics(processGroup);
+
+ // Unwrapped through PluginFileSystemFactory and invoked exactly once.
+ assertThat(factory.setMetricGroupCalls).hasValue(1);
+ // The group handed to the factory is the "filesystem" child of the
process group, not the
+ // process group itself.
+ assertThat(processGroup.childGroupNames).containsExactly("filesystem");
+
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
+ }
+
+ @Test
+ void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {
+ initializeWithPlugins(new PlainFactory("plain-test-fs"));
+
+ assertThatCode(() -> FileSystem.attachMetrics(new
UnregisteredMetricsGroup()))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void attachMetricsIsResilientToAFactoryThatThrows() {
+ RecordingMetricsAwareFactory ok = new
RecordingMetricsAwareFactory("ok-test-fs");
+ initializeWithPlugins(new
ThrowingMetricsAwareFactory("throwing-test-fs"), ok);
+
+ // A misbehaving plugin must never break process startup, and
well-behaved factories must
+ // still receive their group regardless of iteration order.
+ assertThatCode(() -> FileSystem.attachMetrics(new
UnregisteredMetricsGroup()))
+ .doesNotThrowAnyException();
+ assertThat(ok.setMetricGroupCalls).hasValue(1);
+ }
+
+ @Test
+ void attachMetricsDoesNotThrowWhenInvokedRepeatedly() {
+ RecordingMetricsAwareFactory factory = new
RecordingMetricsAwareFactory("idem-test-fs");
+ initializeWithPlugins(factory);
+
+ MetricGroup group = new UnregisteredMetricsGroup();
+ assertThatCode(
+ () -> {
+ FileSystem.attachMetrics(group);
+ FileSystem.attachMetrics(group);
+ })
+ .doesNotThrowAnyException();
Review Comment:
That only verifies that repeat request doesn't throw, but doesn't verify
idempotency
##########
flink-core/src/main/java/org/apache/flink/core/plugin/MetricsAware.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Capability marker for {@link Plugin}s that want to register Flink metrics.
+ *
+ * <p>This is an opt-in extension to the plugin SPI. A plugin declares {@code
implements
+ * SomePluginSpi, MetricsAware}; plugins that do not implement it are
byte-for-byte unchanged and
Review Comment:
!nit. This doc is a bit bloated, I don't think we need to overexplain how to
use interfaces with (I am especially triggered by not "load bearing"
`byte-for-byte unchanged` clarifications)
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogram.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+import java.util.Arrays;
+
+/**
+ * Minimal sliding-window {@link Histogram} used by {@link AwsSdkMetricBridge}
for {@code
+ * api_call_duration_ms}.
+ *
+ * <p>Backed by a fixed-size circular buffer holding the most recent {@code
windowSize} samples
+ * (default {@value #DEFAULT_WINDOW_SIZE}). This bounds memory regardless of
request volume and
+ * keeps {@link #update(long)} O(1); statistics are computed from a sorted
snapshot of the window.
+ *
+ * <p>flink-s3-fs-native deliberately keeps a minimal dependency footprint and
does not depend on
+ * flink-runtime, so {@code DescriptiveStatisticsHistogram} is not available;
this is a small
+ * self-contained equivalent.
+ */
+@Internal
+public class S3MetricHistogram implements Histogram {
Review Comment:
Does this class have anything S3 related? It looks like a good candidate to
be extracted as cloud agnostic code
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+ @Test
+ void successfulCallIncrementsApiCallCountAndRecordsDuration() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0,
200));
+
+
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+ Histogram histogram =
root.histograms.get("op=PutObject/api_call_duration_ms");
+ assertThat(histogram).isNotNull();
+ assertThat(histogram.getCount()).isEqualTo(1L);
+ assertThat(histogram.getStatistics().getMax()).isEqualTo(120L);
+
assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count");
+ }
+
+ @Test
+ void throttledCallIncrementsThrottleAndRetryCounts() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ // Two throttled attempts (503) followed by a successful one (200);
RETRY_COUNT = 2.
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+ assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+ // The final attempt succeeded, so the overall call is classified 2xx.
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+ }
+
+ @Test
+ void clientErrorIsClassifiedAs4xx() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0,
404));
+
+
assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count");
+ }
+
+ @Test
+ void allowlistRegistersOnlyTheListedMetrics() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ // Only api_call_count is allowed; duration, throttle and retry must
be skipped.
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root,
+
Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT),
+ S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms");
+
assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count");
+
assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count");
+ }
+
+ @Test
+ void wildcardAllowlistRegistersEveryMetric() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root,
+ Collections.singletonList("*"),
+ S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms.get("op=UploadPart/api_call_duration_ms")).isNotNull();
+ assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+ }
+
+ @Test
+ void emptyAllowlistFallsBackToDefaults() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root, Collections.emptyList(),
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0,
200));
+
+ // The five default metrics include api_call_count and
api_call_duration_ms.
+
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms.get("op=PutObject/api_call_duration_ms")).isNotNull();
+ }
+
+ @Test
+ void serverErrorRetryIsClassifiedAs5xx() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("GetObject", Duration.ofMillis(50), true, 1,
500, 200));
+
+
assertThat(root.count("op=GetObject/reason=5xx/retry_count")).isEqualTo(1L);
+
assertThat(root.counters).doesNotContainKey("op=GetObject/throttle_count");
+ }
+
+ @Test
+ void accumulatesAcrossMultipleCalls() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("GetObject", Duration.ofMillis(10), true, 0,
200));
+ bridge.publish(apiCall("GetObject", Duration.ofMillis(30), true, 0,
200));
+
+
assertThat(root.count("op=GetObject/status_class=2xx/api_call_count")).isEqualTo(2L);
+ Histogram histogram =
root.histograms.get("op=GetObject/api_call_duration_ms");
+ assertThat(histogram.getCount()).isEqualTo(2L);
+ assertThat(histogram.getStatistics().getMin()).isEqualTo(10L);
+ assertThat(histogram.getStatistics().getMax()).isEqualTo(30L);
+ }
+
+ @Test
+ void publishOfEmptyCollectionDoesNotThrowAndUsesUnknownOp() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(MetricCollector.create("ApiCall").collect());
+
+ assertThat(root.counters.keySet()).anyMatch(key ->
key.contains("op=Unknown"));
+ }
+
+ private static MetricCollection apiCall(
+ String op, Duration duration, boolean successful, int retries,
int... attemptStatuses) {
+ MetricCollector apiCall = MetricCollector.create("ApiCall");
+ apiCall.reportMetric(CoreMetric.OPERATION_NAME, op);
+ apiCall.reportMetric(CoreMetric.API_CALL_DURATION, duration);
+ apiCall.reportMetric(CoreMetric.API_CALL_SUCCESSFUL, successful);
+ apiCall.reportMetric(CoreMetric.RETRY_COUNT, retries);
+ for (int status : attemptStatuses) {
+ MetricCollector attempt = apiCall.createChild("ApiCallAttempt");
+ attempt.reportMetric(HttpMetric.HTTP_STATUS_CODE, status);
+ }
+ return apiCall.collect();
+ }
+
+ /** A {@link MetricGroup} that captures registered metrics keyed by their
full label path. */
+ private static final class CapturingMetricGroup extends
UnregisteredMetricsGroup {
Review Comment:
Feels like we have similar duplicated code in other places. Should this be
extracted as a lightweight dependency module for metric testing utils?
@rkhachatryan, do you know what is common pattern in Flink codebase for
repetitive testing mocks?
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -316,6 +318,53 @@ public static List<FileSystemFactory>
getRegisteredFileSystemFactories() {
}
}
+ /**
+ * Hands a runtime-owned, process-level {@link MetricGroup} to every
registered {@link
+ * FileSystemFactory} that opts into metrics via {@link MetricsAware}.
+ *
+ * <p>This is the second phase of file system initialization. {@link
#initialize(Configuration,
+ * PluginManager)} runs at process startup, before the {@code
MetricRegistry} exists; this
+ * method is therefore invoked separately, once the registry and a
process-level {@link
+ * MetricGroup} are available. It is called from the TaskManager and
JobManager entrypoints
+ * only. Contexts without a process-level {@link MetricGroup} (CLI,
HistoryServer, YARN client)
+ * simply never call it, and their file system plugins continue to operate
without emitting
+ * metrics.
+ *
+ * <p>The call is idempotent: factories receive a child group {@code
<process>.filesystem}, and
+ * {@link MetricGroup#addGroup} returns the same child on repeated calls
with the same parent,
+ * so re-invocation does not register duplicate metrics. Factories that do
not implement {@link
+ * MetricsAware} are skipped.
+ *
+ * @param processMetricGroup the process-level metric group to register
file system metrics
+ * under.
+ */
+ @Internal
+ public static void attachMetrics(MetricGroup processMetricGroup) {
+ checkNotNull(processMetricGroup, "processMetricGroup");
+ LOCK.lock();
+ try {
+ final MetricGroup fsGroup =
processMetricGroup.addGroup("filesystem");
+ for (FileSystemFactory factory : FS_FACTORIES.values()) {
+ // Plugin-loaded factories are wrapped in a
PluginFileSystemFactory, which is itself
+ // MetricsAware and forwards setMetricGroup to the inner
factory under the plugin
+ // classloader, so this plain instanceof reaches both wrapped
and direct factories.
+ if (factory instanceof MetricsAware) {
Review Comment:
Would this work for wrapped filesystems (e.g. for
`ConnectionLimitingFileSystemFactory`)? Will all wrappers have to implement
this interface too?
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ * <li>{@code api_call_count} (Counter) — labels {@code op}, {@code
status_class}
+ * <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ * <li>{@code throttle_count} (Counter) — label {@code op}
+ * <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+ static final String API_CALL_COUNT = "api_call_count";
+ static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+ static final String THROTTLE_COUNT = "throttle_count";
+ static final String RETRY_COUNT = "retry_count";
+ static final String IOPS = "iops";
+
+ /** The default-on metric set from FLIP-576. {@code iops} is derived, not
registered. */
+ static final List<String> DEFAULT_ALLOWLIST =
+ Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS,
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+ private static final String WILDCARD = "*";
+
+ private static final String LABEL_OP = "op";
+ private static final String LABEL_STATUS_CLASS = "status_class";
+ private static final String LABEL_REASON = "reason";
+
+ private static final String UNKNOWN_OP = "Unknown";
+
+ private final MetricGroup fsScope;
+ private final int histogramWindowSize;
+
+ private final boolean allowAll;
+ private final Set<String> allowlist;
+
+ // op and label sets are closed, so these maps are bounded by construction.
+ private final ConcurrentHashMap<String, Counter> counters = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, S3MetricHistogram> histograms =
+ new ConcurrentHashMap<>();
+
+ public AwsSdkMetricBridge(MetricGroup fsScope) {
+ this(fsScope, DEFAULT_ALLOWLIST,
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+ }
+
+ public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) {
+ this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize);
+ }
+
+ public AwsSdkMetricBridge(
+ MetricGroup fsScope, @Nullable Collection<String> allowlist, int
histogramWindowSize) {
+ this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not
be null");
+ Preconditions.checkArgument(
+ histogramWindowSize > 0, "histogramWindowSize must be
positive");
+ this.histogramWindowSize = histogramWindowSize;
+
+ if (allowlist == null || allowlist.isEmpty()) {
+ LOG.warn(
+ "S3 metrics allowlist is empty; falling back to the
default metric set {}",
+ DEFAULT_ALLOWLIST);
+ this.allowAll = false;
+ this.allowlist = new HashSet<>(DEFAULT_ALLOWLIST);
+ } else if (allowlist.contains(WILDCARD)) {
+ this.allowAll = true;
+ this.allowlist = new HashSet<>();
+ } else {
+ this.allowAll = false;
+ this.allowlist = new HashSet<>(allowlist);
+ }
+ }
+
+ private boolean allowed(String metricName) {
+ return allowAll || allowlist.contains(metricName);
+ }
+
+ @Override
+ public void publish(MetricCollection apiCall) {
+ try {
+ translate(apiCall);
+ } catch (Throwable t) {
+ // Defence in depth: a metric failure must never affect S3 IO.
+ LOG.debug("Failed to publish S3 SDK metrics", t);
+ }
+ }
+
+ private void translate(MetricCollection apiCall) {
+ final String op = first(apiCall, CoreMetric.OPERATION_NAME,
UNKNOWN_OP);
+
+ final Duration duration = first(apiCall, CoreMetric.API_CALL_DURATION,
null);
+ if (duration != null && allowed(API_CALL_DURATION_MS)) {
+ histogram(op).update(duration.toMillis());
+ }
+
+ // HTTP_STATUS_CODE lives on the per-attempt children, not on the
top-level ApiCall record.
+ // status_class reflects the overall outcome (last attempt); the retry
reason reflects the
+ // failures that triggered the retries (any attempt), so they are
tracked separately.
+ int throttleResponses = 0;
+ boolean sawServerError = false;
+ Integer lastStatus = null;
+ for (MetricCollection attempt : apiCall.children()) {
+ for (Integer status :
attempt.metricValues(HttpMetric.HTTP_STATUS_CODE)) {
+ if (status != null) {
+ lastStatus = status;
+ if (isThrottle(status)) {
+ throttleResponses++;
+ } else if (status >= 500) {
+ sawServerError = true;
+ }
+ }
+ }
+ }
+
+ final Boolean successful = first(apiCall,
CoreMetric.API_CALL_SUCCESSFUL, null);
+ if (allowed(API_CALL_COUNT)) {
+ apiCallCount(op, statusClass(lastStatus, successful)).inc();
+ }
+
+ if (throttleResponses > 0 && allowed(THROTTLE_COUNT)) {
+ throttleCount(op).inc(throttleResponses);
+ }
+
+ final Integer retries = first(apiCall, CoreMetric.RETRY_COUNT, 0);
+ if (retries != null && retries > 0 && allowed(RETRY_COUNT)) {
+ retryCount(op, retryReason(throttleResponses > 0,
sawServerError)).inc(retries);
+ }
+ }
+
+ private static boolean isThrottle(int status) {
+ return status == 429 || status == 503;
+ }
+
+ private static String statusClass(Integer status, Boolean successful) {
Review Comment:
Should/can we extract this as cloud agnostic?
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+ @Test
+ void successfulCallIncrementsApiCallCountAndRecordsDuration() {
Review Comment:
Can we parametrise tests? For example tests for different status_class are
very repetitive
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+ @Test
+ void successfulCallIncrementsApiCallCountAndRecordsDuration() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0,
200));
+
+
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+ Histogram histogram =
root.histograms.get("op=PutObject/api_call_duration_ms");
+ assertThat(histogram).isNotNull();
+ assertThat(histogram.getCount()).isEqualTo(1L);
+ assertThat(histogram.getStatistics().getMax()).isEqualTo(120L);
+
assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count");
+ }
+
+ @Test
+ void throttledCallIncrementsThrottleAndRetryCounts() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ // Two throttled attempts (503) followed by a successful one (200);
RETRY_COUNT = 2.
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+ assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+ // The final attempt succeeded, so the overall call is classified 2xx.
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+ }
+
+ @Test
+ void clientErrorIsClassifiedAs4xx() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0,
404));
+
+
assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count");
+ }
+
+ @Test
+ void allowlistRegistersOnlyTheListedMetrics() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ // Only api_call_count is allowed; duration, throttle and retry must
be skipped.
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root,
+
Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT),
+ S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms");
+
assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count");
+
assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count");
+ }
+
+ @Test
+ void wildcardAllowlistRegistersEveryMetric() {
Review Comment:
I think this test can't tell the difference between * and fallback to
default allowlist. Are we checking here metrics that are not in default
allowlist?
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+ @Test
+ void successfulCallIncrementsApiCallCountAndRecordsDuration() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0,
200));
+
+
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+ Histogram histogram =
root.histograms.get("op=PutObject/api_call_duration_ms");
+ assertThat(histogram).isNotNull();
+ assertThat(histogram.getCount()).isEqualTo(1L);
+ assertThat(histogram.getStatistics().getMax()).isEqualTo(120L);
+
assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count");
+ }
+
+ @Test
+ void throttledCallIncrementsThrottleAndRetryCounts() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ // Two throttled attempts (503) followed by a successful one (200);
RETRY_COUNT = 2.
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+ assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+ // The final attempt succeeded, so the overall call is classified 2xx.
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+ }
+
+ @Test
+ void clientErrorIsClassifiedAs4xx() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+ bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0,
404));
+
+
assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count");
+ }
+
+ @Test
+ void allowlistRegistersOnlyTheListedMetrics() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ // Only api_call_count is allowed; duration, throttle and retry must
be skipped.
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root,
+
Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT),
+ S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms");
+
assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count");
+
assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count");
+ }
+
+ @Test
+ void wildcardAllowlistRegistersEveryMetric() {
+ CapturingMetricGroup root = new CapturingMetricGroup();
+ AwsSdkMetricBridge bridge =
+ new AwsSdkMetricBridge(
+ root,
+ Collections.singletonList("*"),
+ S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+ bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2,
503, 503, 200));
+
+
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+
assertThat(root.histograms.get("op=UploadPart/api_call_duration_ms")).isNotNull();
+ assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+ }
+
+ @Test
+ void emptyAllowlistFallsBackToDefaults() {
Review Comment:
I think we shouldn't fallback to default. If user for some reason provided
empty allowlist, this should throw error early. I don't see why someone would
provide empty list unless they would like to disable all metrics (which should
be done with master switch)
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3FileSystemFactoryMetricsTest.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.fs.s3native.NativeS3AFileSystemFactory;
+import org.apache.flink.fs.s3native.NativeS3FileSystemFactory;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that the native S3 factories register metrics under a {@code
filesystem_type} label whose
+ * value is the factory scheme, so {@code s3://} and {@code s3a://} traffic
remain distinguishable.
+ */
+class NativeS3FileSystemFactoryMetricsTest {
+
+ @Test
+ void s3FactoryUsesSchemeAsFilesystemTypeLabel() {
Review Comment:
!nit, can be parametrised
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogram.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+import java.util.Arrays;
+
+/**
+ * Minimal sliding-window {@link Histogram} used by {@link AwsSdkMetricBridge}
for {@code
+ * api_call_duration_ms}.
+ *
+ * <p>Backed by a fixed-size circular buffer holding the most recent {@code
windowSize} samples
+ * (default {@value #DEFAULT_WINDOW_SIZE}). This bounds memory regardless of
request volume and
+ * keeps {@link #update(long)} O(1); statistics are computed from a sorted
snapshot of the window.
+ *
+ * <p>flink-s3-fs-native deliberately keeps a minimal dependency footprint and
does not depend on
+ * flink-runtime, so {@code DescriptiveStatisticsHistogram} is not available;
this is a small
Review Comment:
> DescriptiveStatisticsHistogram
I am not sure I am very comfortable with duplicating code chunks with
non-trivial logic. If dependency footprint here is a real concern, it would
make more sense to me to extract some lightweight parts of flink-runtime in a
separate module.
@rkhachatryan, WDYT on this one?
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ * <li>{@code api_call_count} (Counter) — labels {@code op}, {@code
status_class}
+ * <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ * <li>{@code throttle_count} (Counter) — label {@code op}
+ * <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a
closed set of ~15 values
Review Comment:
> ~15 values
That can easily get stale
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java:
##########
@@ -640,6 +640,11 @@ public static TaskExecutor startTaskManager(
resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
+ // Second-phase init for file system plugins that opt into metrics
(e.g.
+ // flink-s3-fs-native): hand them the process-level metric group now
that the
+ // MetricRegistry exists. See FileSystem#attachMetrics and
MetricsAware.
+ FileSystem.attachMetrics(taskManagerMetricGroup.f0);
Review Comment:
+1
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -341,6 +389,46 @@ public void configure(Configuration config) {
this.bucketConfigProvider = new BucketConfigProvider(config);
}
+ @Override
+ public synchronized void setMetricGroup(MetricGroup metricGroup) {
+ // filesystem_type label value is the scheme ("s3" / "s3a"). This is
deliberate: s3:// and
+ // s3a:// are served by separate factory instances, so keeping the
scheme as the label value
+ // lets their traffic be told apart, and sibling FS plugins register
the same label key with
+ // their own scheme. May be called more than once (see MetricsAware);
reset the cached
+ // bridge
+ // so a re-attach with a different group re-scopes metrics created
afterwards.
+ this.pluginMetrics = metricGroup.addGroup("filesystem_type",
getScheme());
Review Comment:
is `type` here replaced by scheme or just appended as "filesystem_type_s3"?
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.fs.s3native.NativeS3FileSystemFactory;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end test proving that real S3 operations performed through {@code
NativeS3FileSystem} are
+ * translated into Flink metrics by {@link AwsSdkMetricBridge} and become
visible in a real Flink
+ * metric registry.
+ *
+ * <p>Unlike {@link AwsSdkMetricBridgeTest} (which drives the bridge with
synthesized SDK records
+ * and a fake {@code MetricGroup}), this test exercises the full chain: the
AWS SDK actually invokes
+ * the registered {@link software.amazon.awssdk.metrics.MetricPublisher} after
each completed API
+ * call, the bridge registers and updates {@link Counter}/{@link Histogram}
handles, and the
+ * assertions read those handles back through {@link MetricListener}'s real
{@code MetricRegistry}.
+ *
+ * <p>Assertions use only GET/HEAD/LIST round trips, which carry no request
body and are therefore
+ * unaffected by the request-checksum behaviour newer AWS SDK versions apply
to {@code PutObject}.
+ *
+ * <p>Requires Docker; auto-skipped when Docker is unavailable.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+class NativeS3MetricsEmissionITCase {
+
+ private static final String MINIO_IMAGE =
"minio/minio:RELEASE.2022-02-07T08-17-33Z";
+ private static final int MINIO_PORT = 9000;
+ private static final String ACCESS_KEY = "metricsAccessKey";
+ private static final String SECRET_KEY = "metricsSecretKey";
+ private static final String BUCKET = "flip576-metrics";
+
+ @Container
+ private static final GenericContainer<?> MINIO =
+ new GenericContainer<>(MINIO_IMAGE)
+ .withEnv("MINIO_ROOT_USER", ACCESS_KEY)
+ .withEnv("MINIO_ROOT_PASSWORD", SECRET_KEY)
+ .withCommand("server", "/data")
+ .withExposedPorts(MINIO_PORT)
+ .waitingFor(
+ Wait.forHttp("/minio/health/ready")
+ .forPort(MINIO_PORT)
+
.withStartupTimeout(Duration.ofMinutes(2)));
+
+ private static String endpoint() {
+ return String.format("http://%s:%d", MINIO.getHost(),
MINIO.getMappedPort(MINIO_PORT));
+ }
+
+ @BeforeAll
+ static void createBucket() {
+ try (S3Client client =
+ S3Client.builder()
+ .endpointOverride(URI.create(endpoint()))
+ .region(Region.US_EAST_1)
+ .credentialsProvider(
+ StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(ACCESS_KEY,
SECRET_KEY)))
+ .serviceConfiguration(
+
S3Configuration.builder().pathStyleAccessEnabled(true).build())
+ .build()) {
+
client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
+ }
+ }
+
+ @Test
+ void realS3OperationsEmitFlinkMetrics() throws Exception {
+ Configuration config = new Configuration();
+ config.set(NativeS3FileSystemFactory.ENDPOINT, endpoint());
+ config.set(NativeS3FileSystemFactory.ACCESS_KEY, ACCESS_KEY);
+ config.set(NativeS3FileSystemFactory.SECRET_KEY, SECRET_KEY);
+ config.set(NativeS3FileSystemFactory.REGION, "us-east-1");
+ config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+ config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false);
+ config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED,
false);
+ config.set(NativeS3FileSystemFactory.METRICS_ENABLED, true);
+
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ factory.configure(config);
+
+ MetricListener metricListener = new MetricListener();
+ // Mirror what FileSystem#attachMetrics hands to the factory: the
"filesystem" child of the
+ // process-level group.
+ MetricGroup fsGroup =
metricListener.getMetricGroup().addGroup("filesystem");
+ factory.setMetricGroup(fsGroup);
+
+ FileSystem fs = factory.create(URI.create("s3://" + BUCKET + "/"));
Review Comment:
Why only s3 and not s3a too? (maybe parametrised?)
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogramTest.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.HistogramStatistics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3MetricHistogram}. */
+class S3MetricHistogramTest {
Review Comment:
If we were to use existing implementation for the histogram, we wouldn't
need this test
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ * <li>{@code api_call_count} (Counter) — labels {@code op}, {@code
status_class}
+ * <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ * <li>{@code throttle_count} (Counter) — label {@code op}
+ * <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+ static final String API_CALL_COUNT = "api_call_count";
+ static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+ static final String THROTTLE_COUNT = "throttle_count";
+ static final String RETRY_COUNT = "retry_count";
+ static final String IOPS = "iops";
+
+ /** The default-on metric set from FLIP-576. {@code iops} is derived, not
registered. */
+ static final List<String> DEFAULT_ALLOWLIST =
+ Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS,
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+ private static final String WILDCARD = "*";
+
+ private static final String LABEL_OP = "op";
+ private static final String LABEL_STATUS_CLASS = "status_class";
+ private static final String LABEL_REASON = "reason";
+
+ private static final String UNKNOWN_OP = "Unknown";
+
+ private final MetricGroup fsScope;
+ private final int histogramWindowSize;
+
+ private final boolean allowAll;
+ private final Set<String> allowlist;
+
+ // op and label sets are closed, so these maps are bounded by construction.
+ private final ConcurrentHashMap<String, Counter> counters = new
ConcurrentHashMap<>();
Review Comment:
!nit Map?
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.fs.s3native.NativeS3FileSystemFactory;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end test proving that real S3 operations performed through {@code
NativeS3FileSystem} are
+ * translated into Flink metrics by {@link AwsSdkMetricBridge} and become
visible in a real Flink
+ * metric registry.
+ *
+ * <p>Unlike {@link AwsSdkMetricBridgeTest} (which drives the bridge with
synthesized SDK records
+ * and a fake {@code MetricGroup}), this test exercises the full chain: the
AWS SDK actually invokes
+ * the registered {@link software.amazon.awssdk.metrics.MetricPublisher} after
each completed API
+ * call, the bridge registers and updates {@link Counter}/{@link Histogram}
handles, and the
+ * assertions read those handles back through {@link MetricListener}'s real
{@code MetricRegistry}.
+ *
+ * <p>Assertions use only GET/HEAD/LIST round trips, which carry no request
body and are therefore
+ * unaffected by the request-checksum behaviour newer AWS SDK versions apply
to {@code PutObject}.
+ *
+ * <p>Requires Docker; auto-skipped when Docker is unavailable.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+class NativeS3MetricsEmissionITCase {
+
+ private static final String MINIO_IMAGE =
"minio/minio:RELEASE.2022-02-07T08-17-33Z";
+ private static final int MINIO_PORT = 9000;
Review Comment:
I thought we discussed before that we should avoid MinIO tests because of
licence change + it is in maintenance mode. Do we want to keep this dependency?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]