This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fd267b900d [INLONG-11199][Sort] Integrate Grafana Loki for connectors 
(#11212)
fd267b900d is described below

commit fd267b900d988bda07a9425923e3ced36960d7be
Author: Haotian Ma <60374114+qy-liu...@users.noreply.github.com>
AuthorDate: Sat Oct 12 10:28:52 2024 +0800

    [INLONG-11199][Sort] Integrate Grafana Loki for connectors (#11212)
---
 docker/docker-compose/docker-compose.yml           |  56 +++++++++-
 docker/docker-compose/log-system/loki.yaml         |  58 ++++++++++
 docker/docker-compose/log-system/otel-config.yaml  |  39 +++++++
 .../inlong/sort/base/util/OpenTelemetryLogger.java | 120 +++++++++++++++++----
 4 files changed, 253 insertions(+), 20 deletions(-)

diff --git a/docker/docker-compose/docker-compose.yml 
b/docker/docker-compose/docker-compose.yml
index 1e894b00b8..a3573d799c 100644
--- a/docker/docker-compose/docker-compose.yml
+++ b/docker/docker-compose/docker-compose.yml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-version: '2.4'
+version: '3.0'
 
 services:
   mysql:
@@ -129,6 +129,7 @@ services:
       - |
         FLINK_PROPERTIES=
         jobmanager.rpc.address: jobmanager
+      - OTEL_EXPORTER_ENDPOINT=logcollector:4317
     ports:
       - "8081:8081"
     command: jobmanager
@@ -142,4 +143,57 @@ services:
         FLINK_PROPERTIES=
         jobmanager.rpc.address: jobmanager
         taskmanager.numberOfTaskSlots: 2
+      - OTEL_EXPORTER_ENDPOINT=logcollector:4317
     command: taskmanager
+
+  # The following services are used to collect logs for InLong-sort, not 
effective by default, you can enable them by add `--profile sort-report up` to 
the `docker-compose` command
+  # opentelemetry collector
+  logcollector:
+    image: otel/opentelemetry-collector-contrib:0.110.0
+    container_name: logcollector
+    volumes:
+      - ./log-system/otel-config.yaml:/otel-config.yaml
+    command: [ "--config=/otel-config.yaml"]
+    profiles: [sort-report]
+    ports:
+      - "4317:4317"
+
+  # grafana loki
+  loki:
+    image: grafana/loki:3.0.0
+    ports:
+      - "3100:3100"
+    profiles: [sort-report]
+    volumes:
+      - ./log-system/loki.yaml:/etc/loki/local-config.yaml
+    command: -config.file=/etc/loki/local-config.yaml
+
+  # grafana
+  grafana:
+    environment:
+      - GF_PATHS_PROVISIONING=/etc/grafana/provisioning
+      - GF_AUTH_ANONYMOUS_ENABLED=true
+      - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
+    entrypoint:
+      - sh
+      - -euc
+      - |
+        mkdir -p /etc/grafana/provisioning/datasources
+        cat <<EOF > /etc/grafana/provisioning/datasources/ds.yaml
+        apiVersion: 1
+        datasources:
+        - name: Loki
+          type: loki
+          access: proxy 
+          orgId: 1
+          url: http://loki:3100
+          basicAuth: false
+          isDefault: true
+          version: 1
+          editable: false
+        EOF
+        /run.sh
+    image: grafana/grafana:latest
+    ports:
+      - "3000:3000"
+    profiles: [sort-report]
\ No newline at end of file
diff --git a/docker/docker-compose/log-system/loki.yaml 
b/docker/docker-compose/log-system/loki.yaml
new file mode 100644
index 0000000000..746f8baac5
--- /dev/null
+++ b/docker/docker-compose/log-system/loki.yaml
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+auth_enabled: false
+
+limits_config:
+  allow_structured_metadata: true
+  volume_enabled: true
+  otlp_config:
+    resource_attributes:
+      attributes_config:
+        - action: index_label
+          attributes:
+            - level
+server:
+  http_listen_port: 3100
+
+common:
+  ring:
+    instance_addr: 0.0.0.0
+    kvstore:
+      store: inmemory
+  replication_factor: 1
+  path_prefix: /tmp/loki
+
+schema_config:
+  configs:
+    - from: 2020-05-15
+      store: tsdb
+      object_store: filesystem
+      schema: v13
+      index:
+        prefix: index_
+        period: 24h
+
+storage_config:
+  tsdb_shipper:
+    active_index_directory: /tmp/loki/index
+    cache_location: /tmp/loki/index_cache
+  filesystem:
+    directory: /tmp/loki/chunks
+
+pattern_ingester:
+  enabled: true
diff --git a/docker/docker-compose/log-system/otel-config.yaml 
b/docker/docker-compose/log-system/otel-config.yaml
new file mode 100644
index 0000000000..6942f11e0f
--- /dev/null
+++ b/docker/docker-compose/log-system/otel-config.yaml
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+receivers:
+  otlp:
+    protocols:
+      grpc:
+        endpoint: logcollector:4317
+processors:
+  batch:
+
+exporters:
+  logging:
+    verbosity: detailed
+  otlphttp:
+    endpoint: http://loki:3100/otlp
+    tls:
+      insecure: true
+
+service:
+  pipelines:
+    logs:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [otlphttp, logging]
\ No newline at end of file
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
index 82c88cf5f7..54dfefcf11 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
@@ -38,6 +38,9 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 
+/**
+ * OpenTelemetryLogger to collect logs and send to OpenTelemetry
+ */
 public class OpenTelemetryLogger {
 
     private OpenTelemetrySdk SDK; // OpenTelemetry SDK
@@ -50,33 +53,98 @@ public class OpenTelemetryLogger {
 
     private final Level logLevel; // Log4j Log Level
 
+    private final String localHostIp; // Local Host IP
+
     private static final Logger LOG = 
LoggerFactory.getLogger(OpenTelemetryLogger.class);
 
-    public OpenTelemetryLogger() {
-        // Default Service Name
-        serviceName = "inlong-sort-connector";
-        // Get OpenTelemetry Exporter Endpoint from Environment Variable
-        if (System.getenv("OTEL_EXPORTER_ENDPOINT") != null) {
-            endpoint = System.getenv("OTEL_EXPORTER_ENDPOINT");
-        } else {
-            endpoint = "localhost:4317";
-        }
-        // Default Log4j Layout
-        this.layout = PatternLayout.newBuilder()
-                .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n")
-                .withCharset(StandardCharsets.UTF_8)
-                .build();
-        // Default Log4j Log Level
-        this.logLevel = Level.INFO;
+    public OpenTelemetryLogger(Builder builder) {
+        this.serviceName = builder.serviceName;
+        this.endpoint = builder.endpoint;
+        this.layout = builder.layout;
+        this.logLevel = builder.logLevel;
+        this.localHostIp = builder.localHostIp;
     }
 
-    public OpenTelemetryLogger(String serviceName, String endpoint, Layout<?> 
layout, Level logLevel) {
+    public OpenTelemetryLogger(String serviceName, String endpoint, Layout<?> 
layout, Level logLevel,
+            String localHostIp) {
         this.serviceName = serviceName;
         this.endpoint = endpoint;
         this.layout = layout;
         this.logLevel = logLevel;
+        this.localHostIp = localHostIp;
+    }
+
+    /**
+     * OpenTelemetryLogger Builder
+     */
+    public static final class Builder {
+
+        private String endpoint; // OpenTelemetry Exporter Endpoint
+
+        private String serviceName; // OpenTelemetry Service Name
+
+        private Layout<?> layout; // Log4j Layout
+
+        private Level logLevel; // Log4j Log Level
+
+        private String localHostIp;
+
+        public Builder() {
+        }
+
+        public Builder setServiceName(String serviceName) {
+            this.serviceName = serviceName;
+            return this;
+        }
+
+        public Builder setLayout(Layout<?> layout) {
+            this.layout = layout;
+            return this;
+        }
+
+        public Builder setEndpoint(String endpoint) {
+            this.endpoint = endpoint;
+            return this;
+        }
+
+        public Builder setLogLevel(Level logLevel) {
+            this.logLevel = logLevel;
+            return this;
+        }
+
+        public Builder setLocalHostIp(String localHostIp) {
+            this.localHostIp = localHostIp;
+            return this;
+        }
+
+        public OpenTelemetryLogger build() {
+            if (this.serviceName == null) {
+                this.serviceName = "unnamed_service";
+            }
+            if (this.endpoint == null) {
+                if (System.getenv("OTEL_EXPORTER_ENDPOINT") != null) {
+                    this.endpoint = System.getenv("OTEL_EXPORTER_ENDPOINT");
+                } else {
+                    this.endpoint = "localhost:4317";
+                }
+            }
+            if (this.layout == null) {
+                this.layout = PatternLayout.newBuilder()
+                        .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level 
%logger{36} - %msg%n")
+                        .withCharset(StandardCharsets.UTF_8)
+                        .build();
+            }
+            if (this.logLevel == null) {
+                this.logLevel = Level.INFO;
+            }
+            return new OpenTelemetryLogger(this);
+        }
+
     }
 
+    /**
+     * Create OpenTelemetry SDK with OpenTelemetry Exporter
+     */
     private void createOpenTelemetrySdk() {
         // Create OpenTelemetry SDK
         OpenTelemetrySdkBuilder sdkBuilder = OpenTelemetrySdk.builder();
@@ -84,7 +152,9 @@ public class OpenTelemetryLogger {
         SdkLoggerProviderBuilder loggerProviderBuilder = 
SdkLoggerProvider.builder();
         // get Resource
         Resource resource = Resource.getDefault().toBuilder()
+                .put(ResourceAttributes.SERVICE_NAMESPACE, "inlong_sort")
                 .put(ResourceAttributes.SERVICE_NAME, this.serviceName)
+                .put(ResourceAttributes.HOST_NAME, this.localHostIp)
                 .build();
         // set Resource
         loggerProviderBuilder.setResource(resource);
@@ -102,7 +172,10 @@ public class OpenTelemetryLogger {
         SDK = sdkBuilder.build();
     }
 
-    public void addOpenTelemetryAppender() {
+    /**
+     * Add OpenTelemetryAppender to Log4j
+     */
+    private void addOpenTelemetryAppender() {
         org.apache.logging.log4j.spi.LoggerContext context = 
LogManager.getContext(false);
         LoggerContext loggerContext = (LoggerContext) context;
         Configuration config = loggerContext.getConfiguration();
@@ -122,7 +195,10 @@ public class OpenTelemetryLogger {
         loggerContext.updateLoggers();
     }
 
-    public void removeOpenTelemetryAppender() {
+    /**
+     * Remove OpenTelemetryAppender from Log4j
+     */
+    private void removeOpenTelemetryAppender() {
         org.apache.logging.log4j.spi.LoggerContext context = 
LogManager.getContext(false);
         LoggerContext loggerContext = (LoggerContext) context;
         Configuration config = loggerContext.getConfiguration();
@@ -137,6 +213,9 @@ public class OpenTelemetryLogger {
         loggerContext.updateLoggers();
     }
 
+    /**
+     * Install OpenTelemetryLogger for the application
+     */
     public void install() {
         addOpenTelemetryAppender();
         createOpenTelemetrySdk();
@@ -144,6 +223,9 @@ public class OpenTelemetryLogger {
         LOG.info("OpenTelemetryLogger installed");
     }
 
+    /**
+     * Uninstall OpenTelemetryLogger
+     */
     public void uninstall() {
         LOG.info("OpenTelemetryLogger uninstalled");
         SDK.close();

Reply via email to