     new 05c4494e68e [INLONG-1048][Doc] Add doc for integrating a logging system for Sort (#1049) 
system for Sort (#1049)
05c4494e68e is described below

commit 05c4494e68e2eec9250d4ec27d598202e3b27b24
Author: Haotian Ma <>
AuthorDate: Sun Dec 1 21:25:32 2024 +0800

    [INLONG-1048][Doc] Add doc for integrating a logging system for Sort (#1049)
    Co-authored-by: AloysZhang <>
+title: OpenTelemetry Log Report
+sidebar_position: 6
+## Overview
+As `InLong Sort` runs on different `Task Manager` nodes of `Apache Flink`, 
each node stores the logs independently, and it is inefficient to view the logs 
on each node. To solve this, a centralized log management solution based on 
[OpenTelemetry]( is provided, which allows users to 
efficiently manage Flink logs.
+InLong Sort can integrate the log reporting function into every `Connector`. 
The log processing flow is shown in the figure below. The logs are reported 
through [OpenTelemetry](, collected and processed by 
[OpenTelemetry Collector](, and then 
sent to [Grafana Loki]( for centralized 
+![log process](img/LogProcess.png)
+## Integrating Log Reporting for Connector
+InLong Sort wraps the 
 class, which provides a `Builder` to help users to quickly configure an ` 
OpenTelemetryLogger` and can enable or disable logging reporting by calling its 
`install` or `uninstall` functions. With the help of `OpenTelemetryLogger`, the 
connector can report logs more eas [...]
+1. Construct an `OpenTelemetryLogger` object using 
`OpenTelemetryLogger.Builder()` in the constructor method of connector 
`SourceReader`'s class.
+2. Call `install()` method of the `OpenTelemetryLogger` object in `Start()` 
function of `SourceReader`.
+3. Call `uninstall()` method of the `OpenTelemetryLogger` object in `close()` 
function of `SourceReader`.
+**Note**: If the `maven-shade-plugin` plugin is used, the `opentelemetry` and 
`okhttp` related packages need to be included:
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>io.opentelemetry*</include>
+                                    <include>com.squareup.*</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+The example is:
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
+public class XXXSourceReader<T>
+    private static final Logger LOG = 
+    private final OpenTelemetryLogger openTelemetryLogger;
+    public XXXSourceReader() {
+        ...
+        // initial OpenTelemetryLogger
+        this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+                .setServiceName(this.getClass().getSimpleName())
+                .setLocalHostIp(this.context.getLocalHostName()).build();
+    }
+    @Override
+    public void start() {
+        openTelemetryLogger.install(); //  start log reporting
+        ...
+    }
+    @Override
+    public void close() throws Exception {
+        super.close();
+        openTelemetryLogger.uninstall(); // close log reporting
+    }
+    ...
+The `OpenTelemetryLogger` currently provides the following configuration items:
+| Configuration      | Description          | Default value |
+| -----------        | -------------------- | ------------- |
+|`endpoint`    | `OpenTelemetry Collector` address, if not specified,it will 
try to get from `OTEL_EXPORTER_ENDPOINT` environment variable; if the 
environment variable is not configured, then use the default value.| 
`localhost:4317` |
+| `serviceName` |` OpenTelemetry`'s service name, which can be used to 
distinguish between different connectors. |`unnamed_service  `|
+| `layout` | `Log4j2`'s log format, which is an instance of `PatternLayout` 
class |`%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n`|
+| `logLevel` | Log level |`Level.INFO`|
+| `localHostIp` | IP of the `Flink` node, available in `SourceReader` via 
`this.context.getLocalHostName()`. |`null`|
+## Docker Configuration
+In addition to integrating the log reporting function for  Connector, you also 
need to add three docker containers(`opentelemetry-collector`, `grafana loki`,  
`grafana`), and configure the `OTEL_EXPORTER_ENDPOINT` environment variable for 
the `Flink` container.
+> This part of the configuration is already provided in 
`-inlong-docker-docker-compose-docker-compose.yml`. Just add the `--profile 
sort-report` option when starting `docker compose` to enable it. The full 
command is `docker compose --profile sort-report up -d`
+You can also refer to the following content to configure your own application, 
the `docker-compose.yml` file is shown below:
+# flink jobmanager
+  image: apache/flink:1.15-scala_2.12
+  container_name: jobmanager
+  environment:
+    - |
+      jobmanager.rpc.address: jobmanager
+    - OTEL_EXPORTER_ENDPOINT=logcollector:4317
+  ports:
+    - "8081:8081"
+  command: jobmanager
+# flink taskmanager
+  image: apache/flink:1.15-scala_2.12
+  container_name: taskmanager
+  environment:
+    - |
+      jobmanager.rpc.address: jobmanager
+      taskmanager.numberOfTaskSlots: 2
+    - OTEL_EXPORTER_ENDPOINT=logcollector:4317
+  command: taskmanager
+# opentelemetry collector
+  image: otel/opentelemetry-collector-contrib:0.110.0
+  container_name: logcollector
+  volumes:
+    - ./log-system/otel-config.yaml:/otel-config.yaml
+  command: [ "--config=/otel-config.yaml"]
+  ports:
+    - "4317:4317"
+# grafana loki
+  image: grafana/loki:3.0.0
+  ports:
+    - "3100:3100"
+  volumes:
+    - ./log-system/loki.yaml:/etc/loki/local-config.yaml
+  command: -config.file=/etc/loki/local-config.yaml
+# grafana
+  environment:
+    - GF_PATHS_PROVISIONING=/etc/grafana/provisioning
+  entrypoint:
+    - sh
+    - -euc
+    - |
+      mkdir -p /etc/grafana/provisioning/datasources
+      cat <<EOF > /etc/grafana/provisioning/datasources/ds.yaml
+      apiVersion: 1
+      datasources:
+      - name: Loki
+        type: loki
+        access: proxy 
+        orgId: 1
+        url: http://loki:3100
+        basicAuth: false
+        isDefault: true
+        version: 1
+        editable: false
+      EOF
+      /
+  image: grafana/grafana:latest
+  ports:
+    - "3000:3000"
+You also need to provide configuration files (`otel-config.yaml` for 
logcollector and `loki.yaml` for Loki). The content of the otel-config.yaml 
file is:
+  otlp:
+    protocols:
+      grpc:
+        endpoint: logcollector:4317
+  batch:
+  logging:
+    verbosity: detailed
+  otlphttp:
+    endpoint: http://loki:3100/otlp
+    tls:
+      insecure: true
+  pipelines:
+    logs:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [otlphttp, logging]
+And the content of the `loki.yaml` file is:
+auth_enabled: false
+  allow_structured_metadata: true
+  volume_enabled: true
+  otlp_config:
+    resource_attributes:
+      attributes_config:
+        - action: index_label
+          attributes:
+            - level
+  http_listen_port: 3100
+  ring:
+    instance_addr:
+    kvstore:
+      store: inmemory
+  replication_factor: 1
+  path_prefix: /tmp/loki
+  configs:
+    - from: 2020-05-15
+      store: tsdb
+      object_store: filesystem
+      schema: v13
+      index:
+        prefix: index_
+        period: 24h
+  tsdb_shipper:
+    active_index_directory: /tmp/loki/index
+    cache_location: /tmp/loki/index_cache
+  filesystem:
+    directory: /tmp/loki/chunks
+  enabled: true
+## Usage
+Execute `docker compose --profile sort-report up -d` in the `inlong/docker/` 
path to start the relevant containers, then create and start a task process 
according to [Data 
Ingestion](quick_start/data_ingestion/ (the 
involved connectors need to be integrated with OpenTelemetryAppender).
+After that you can enter the `Grafana Loki` system by 
``, and query the logs by the `service_name` field:
+Click on the log item to view the log details (**Note:** The default log 
reporting level is `ERROR`.):
+title: OpenTelemetry 日志上报
+sidebar_position: 6
+## 概览
+由于 `InLong Sort` 会运行在 `Apache Flink` 的不同 `Task Manager` 
节点上,每个节点独立存储产生的日志,我们需要到每个节点上查看日志,维护效率低下。为此 `InLong Sort` 提供了基于 OpenTelemetry 
+`InLong Sort`可以将日志上报功能集成到各个`Connector`中,其日志处理流程如下图所示。日志通过 
[OpenTelemetry]( 进行上报,经由 [OpenTelemetry 
Collector]( 收集处理后发往 [Grafana 
Loki]( 进行集中管理。 
+## Connector集成日志上报功能
+`InLong Sort` 封装了 
 类,其提供了一个 `Builder` 来帮助用户快速配置一个 `OpenTelemetryLogger` ,并可以通过调用 
`OpenTelemetryLogger` 的 `install` 和 `uninstall` 方法来开启和关闭日志上报功能。借助 
`OpenTelemetryLogger` 我们可以很便捷地为 `Connector` 赋予日志上报功能,以下介绍如何借助 
`OpenTelemetryLogger` 类为符合 
[FLIP-27]( [...]
+1. 在connector `SourceReader` 类构造方法中使用 `OpenTelemetryLogger.Builder()` 构造一个 
`openTelemetryLogger` 对象
+2. 在 `SourceReader` 的 `Start` 接口中调用 `openTelemetryLogger` 对象的 `install()` 方法
+3. 在 `SourceReader` 的 `close` 接口中调用 `openTelemetryLogger` 对象的 `uninstall()` 方法
+**注意**:如果使用了 `maven-shade-plugin` 插件,需要将 `opentelemetry` 及 `okhttp` 相关包包含在内:
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>io.opentelemetry*</include>
+                                    <include>com.squareup.*</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
+public class XXXSourceReader<T>
+    private static final Logger LOG = 
+    private final OpenTelemetryLogger openTelemetryLogger;
+    public XXXSourceReader() {
+        ...
+        // 初始化 OpenTelemetryLogger
+        this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+                .setServiceName(this.getClass().getSimpleName())
+                .setLocalHostIp(this.context.getLocalHostName()).build();
+    }
+    @Override
+    public void start() {
+        openTelemetryLogger.install(); //  开启日志上报功能
+        ...
+    }
+    @Override
+    public void close() throws Exception {
+        super.close();
+        openTelemetryLogger.uninstall(); // 关闭日志上报功能
+    }
+    ...
+目前 `OpenTelemetryLogger` 提供如下配置项:
+| 配置项      | 说明                               | 默认值 |
+| ----------- | ---------------------------------- | -------- |
+|`endpoint`    | `OpenTelemetry 
Collector`地址,如未指定则尝试从`OTEL_EXPORTER_ENDPOINT`环境变量获取;如环境变量也未配置则采用默认值 | 
`localhost:4317` |
+| `serviceName` | `OpenTelemetry `服务名,可以用来区分不同的`connector` |`unnamed_service  
+| `layout` | `Log4j2` 的日志格式,应设置为一个`PatternLayout`对象 |`%d{HH:mm:ss.SSS} [%t] 
%-5level %logger{36} - %msg%n`|
+| `logLevel` | 上报的日志级别 |`Level.INFO`|
+| `localHostIp` | 
所在`Flink`节点IP,可在`SourceReader`中通过`this.context.getLocalHostName()`获取 |`null`|
+## 容器配置
+除了要为 `Connector` 集成日志上报功能外,还需要增加 `opentelemetry-collector`、`grafana 
loki`、`grafana` 三个docker容器,并为 `Flink` 容器配置 `OTEL_EXPORTER_ENDPOINT` 环境变量。
+> 此部分配置在 `/inlong/docker/docker-compose/docker-compose.yml` 中已提供,仅需在启动 `docker 
compose` 时增加 `--profile sort-report` 选项即可,完整启动命令为 `docker compose --profile 
sort-report up -d`
+也可以参考下面的内容配置,`docker-compose.yml` 文件参考如下:
+# flink jobmanager
+  image: apache/flink:1.15-scala_2.12
+  container_name: jobmanager
+  environment:
+    - |
+      jobmanager.rpc.address: jobmanager
+    - OTEL_EXPORTER_ENDPOINT=logcollector:4317
+  ports:
+    - "8081:8081"
+  command: jobmanager
+# flink taskmanager
+  image: apache/flink:1.15-scala_2.12
+  container_name: taskmanager
+  environment:
+    - |
+      jobmanager.rpc.address: jobmanager
+      taskmanager.numberOfTaskSlots: 2
+    - OTEL_EXPORTER_ENDPOINT=logcollector:4317
+  command: taskmanager
+# opentelemetry collector
+  image: otel/opentelemetry-collector-contrib:0.110.0
+  container_name: logcollector
+  volumes:
+    - ./log-system/otel-config.yaml:/otel-config.yaml
+  command: [ "--config=/otel-config.yaml"]
+  ports:
+    - "4317:4317"
+# grafana loki
+  image: grafana/loki:3.0.0
+  ports:
+    - "3100:3100"
+  volumes:
+    - ./log-system/loki.yaml:/etc/loki/local-config.yaml
+  command: -config.file=/etc/loki/local-config.yaml
+# grafana
+  environment:
+    - GF_PATHS_PROVISIONING=/etc/grafana/provisioning
+  entrypoint:
+    - sh
+    - -euc
+    - |
+      mkdir -p /etc/grafana/provisioning/datasources
+      cat <<EOF > /etc/grafana/provisioning/datasources/ds.yaml
+      apiVersion: 1
+      datasources:
+      - name: Loki
+        type: loki
+        access: proxy 
+        orgId: 1
+        url: http://loki:3100
+        basicAuth: false
+        isDefault: true
+        version: 1
+        editable: false
+      EOF
+      /
+  image: grafana/grafana:latest
+  ports:
+    - "3000:3000"
+还需要为 `logcollector` 和 `Loki` 分别提供一个名为 `otel-config.yaml` 和 `loki.yaml` 的配置文件, 
`otel-config.yaml` 文件内容如下:
+  otlp:
+    protocols:
+      grpc:
+        endpoint: logcollector:4317
+  batch:
+  logging:
+    verbosity: detailed
+  otlphttp:
+    endpoint: http://loki:3100/otlp
+    tls:
+      insecure: true
+  pipelines:
+    logs:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [otlphttp, logging]
+`loki.yaml` 文件内容如下:
+auth_enabled: false
+  allow_structured_metadata: true
+  volume_enabled: true
+  otlp_config:
+    resource_attributes:
+      attributes_config:
+        - action: index_label
+          attributes:
+            - level
+  http_listen_port: 3100
+  ring:
+    instance_addr:
+    kvstore:
+      store: inmemory
+  replication_factor: 1
+  path_prefix: /tmp/loki
+  configs:
+    - from: 2020-05-15
+      store: tsdb
+      object_store: filesystem
+      schema: v13
+      index:
+        prefix: index_
+        period: 24h
+  tsdb_shipper:
+    active_index_directory: /tmp/loki/index
+    cache_location: /tmp/loki/index_cache
+  filesystem:
+    directory: /tmp/loki/chunks
+  enabled: true
+## 使用说明
+在 `inlong/docker/` 路径下执行 `docker compose --profile sort-report up -d` 
流程创建并启动一个任务流程(使用到的 `connector` 需要集成好 `OpenTelemetryAppender` ),通过访问 
`` 地址进入 `Grafana Loki` 界面,使用 `service_name` 
+点击相应的日志项,可以查看到日志的详细信息(**注意:**默认设置的日志上报级别为 `ERROR`):

