gaborgsomogyi commented on code in PR #28427:
URL: https://github.com/apache/flink/pull/28427#discussion_r3451265101


##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -321,9 +325,53 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                                     + "When not set, the default chain is 
used: delegation tokens -> "
                                     + "static credentials (if configured) -> 
DefaultCredentialsProvider.");
 
+    public static final ConfigOption<Boolean> METRICS_ENABLED =
+            ConfigOptions.key("s3.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Master switch for publishing S3 operation metrics 
to Flink's metric "
+                                    + "system. When false, no metric publisher 
is attached to the SDK "
+                                    + "and no metric is registered. Metrics 
are only emitted under the "
+                                    + "TaskManager and JobManager entrypoints, 
which provide a "
+                                    + "process-level metric group; other 
contexts (CLI, etc.) emit "
+                                    + "none regardless of this setting.");
+
+    public static final ConfigOption<List<String>> METRICS_ALLOWLIST =
+            ConfigOptions.key("s3.metrics.allowlist")
+                    .stringType()
+                    .asList()
+                    .defaultValues(
+                            "api_call_count",
+                            "api_call_duration_ms",
+                            "throttle_count",
+                            "retry_count",
+                            "iops")
+                    .withDescription(
+                            "Names of S3 metrics to register. Replaces (does 
not merge with) the "
+                                    + "default list. Use \"*\" to register 
every metric the plugin "
+                                    + "emits. An empty list with 
s3.metrics.enabled=true is treated "
+                                    + "as misconfiguration: a warning is 
logged and the defaults are "
+                                    + "used. ('iops' is derived at reporter 
time from api_call_count "
+                                    + "and is not a separately registered 
metric.)");
+
+    public static final ConfigOption<Integer> METRICS_HISTOGRAM_WINDOW_SIZE =
+            ConfigOptions.key("s3.metrics.histogram.window-size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "Reservoir size for S3 latency histograms 
(api_call_duration_ms). "
+                                    + "Bounds memory regardless of request 
volume. Must be positive.");

Review Comment:
   Is this pieces of entries in memory?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -341,6 +389,46 @@ public void configure(Configuration config) {
         this.bucketConfigProvider = new BucketConfigProvider(config);
     }
 
+    @Override
+    public synchronized void setMetricGroup(MetricGroup metricGroup) {
+        // filesystem_type label value is the scheme ("s3" / "s3a"). This is 
deliberate: s3:// and
+        // s3a:// are served by separate factory instances, so keeping the 
scheme as the label value
+        // lets their traffic be told apart, and sibling FS plugins register 
the same label key with
+        // their own scheme. May be called more than once (see MetricsAware); 
reset the cached
+        // bridge
+        // so a re-attach with a different group re-scopes metrics created 
afterwards.
+        this.pluginMetrics = metricGroup.addGroup("filesystem_type", 
getScheme());
+        this.metricBridge = null;
+    }
+
+    /**
+     * Returns the SDK metric publisher to attach to clients, or {@code null} 
when metrics are
+     * disabled or no metric group has been attached yet (e.g. CLI / embedded 
usage). The bridge is
+     * built lazily and cached so all clients of this factory share one set of 
metric handles.
+     */
+    @Nullable
+    private AwsSdkMetricBridge resolveMetricBridge(Configuration config) {
+        final MetricGroup metrics = this.pluginMetrics;
+        if (metrics == null || !config.get(METRICS_ENABLED)) {
+            return null;
+        }
+        AwsSdkMetricBridge bridge = this.metricBridge;
+        if (bridge == null) {
+            synchronized (this) {
+                bridge = this.metricBridge;
+                if (bridge == null) {
+                    bridge =
+                            new AwsSdkMetricBridge(
+                                    metrics,
+                                    config.get(METRICS_ALLOWLIST),
+                                    config.get(METRICS_HISTOGRAM_WINDOW_SIZE));
+                    this.metricBridge = bridge;
+                }
+            }
+        }
+        return bridge;

Review Comment:
   `metricBridge` write is inside the the sync block but read is outside + not 
`transient`. I've a feeling that we have random read here without super well 
founded consideration. Maybe some guardedby would be good as always. Not if 
that fits for `synchronized` but end-to-end what protects what and when would 
be good.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java:
##########
@@ -640,6 +640,11 @@ public static TaskExecutor startTaskManager(
                         resourceID,
                         
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+        // Second-phase init for file system plugins that opt into metrics 
(e.g.
+        // flink-s3-fs-native): hand them the process-level metric group now 
that the
+        // MetricRegistry exists. See FileSystem#attachMetrics and 
MetricsAware.
+        FileSystem.attachMetrics(taskManagerMetricGroup.f0);

Review Comment:
   Now we're solving that for filesystems which is good. How do we think that a 
non-FS plugin can be used in metrics system. We don't need a full featuring 
plan but knowing this is a good measure how this solution fits here.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -321,9 +325,53 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                                     + "When not set, the default chain is 
used: delegation tokens -> "
                                     + "static credentials (if configured) -> 
DefaultCredentialsProvider.");
 
+    public static final ConfigOption<Boolean> METRICS_ENABLED =
+            ConfigOptions.key("s3.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Master switch for publishing S3 operation metrics 
to Flink's metric "
+                                    + "system. When false, no metric publisher 
is attached to the SDK "
+                                    + "and no metric is registered. Metrics 
are only emitted under the "
+                                    + "TaskManager and JobManager entrypoints, 
which provide a "
+                                    + "process-level metric group; other 
contexts (CLI, etc.) emit "
+                                    + "none regardless of this setting.");
+
+    public static final ConfigOption<List<String>> METRICS_ALLOWLIST =
+            ConfigOptions.key("s3.metrics.allowlist")
+                    .stringType()
+                    .asList()
+                    .defaultValues(
+                            "api_call_count",
+                            "api_call_duration_ms",
+                            "throttle_count",
+                            "retry_count",
+                            "iops")

Review Comment:
   Just for my own understanding. I want to use a new metric called `foo`. Do I 
need to go to `s3.metrics.allowlist` definition, check the default value and 
then I need to set: `s3.metrics.allowlist=ORIGINAL_DEFAULT,foo`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java:
##########
@@ -377,6 +377,33 @@ protected void initializeServices(Configuration 
configuration, PluginManager plu
             configuration.set(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
             configuration.set(JobManagerOptions.PORT, 
commonRpcService.getPort());
 
+            metricRegistry = createMetricRegistry(configuration, 
pluginManager, rpcSystem);
+
+            final RpcService metricQueryServiceRpcService =
+                    MetricUtils.startRemoteMetricsRpcService(
+                            configuration,
+                            commonRpcService.getAddress(),
+                            configuration.get(JobManagerOptions.BIND_HOST),
+                            rpcSystem);
+            metricRegistry.startQueryService(metricQueryServiceRpcService, 
null);
+
+            final String hostname = RpcUtils.getHostname(commonRpcService);
+
+            processMetricGroup =
+                    MetricUtils.instantiateProcessMetricGroup(
+                            metricRegistry,
+                            hostname,
+                            
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
+                                    configuration));
+
+            // Second-phase init for file system plugins that opt into metrics 
(e.g.
+            // flink-s3-fs-native): hand them the process-level metric group 
before any file system
+            // is used. This must run ahead of the HA services and BlobServer 
below, because those
+            // may open external file systems (e.g. S3 HA/blob storage), 
creating them first would
+            // cache metric-less file system clients for the rest of the 
process lifetime. See
+            // FileSystem#attachMetrics and MetricsAware.

Review Comment:
   Not yet, checked in-depth. Is this just a move as-is or changed too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to