Zakelly commented on a change in pull request #16522: URL: https://github.com/apache/flink/pull/16522#discussion_r686444883
########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -42,92 +42,61 @@ under the License. Flink 内置了以下这些开箱即用的 state backends : - - *MemoryStateBackend* - - *FsStateBackend* - - *RocksDBStateBackend* + - *HashMapStateBackend* + - *EmbeddedRocksDBStateBackend* -如果不设置,默认使用 MemoryStateBackend。 +如果不设置,默认使用 HashMapStateBackend -### MemoryStateBackend - -在 *MemoryStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 - -在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。 - -MemoryStateBackend 的限制: - - - 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。 - - 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于 akka frame 大小(看[配置参数]({{< ref "docs/deployment/config" >}}))。 - - 聚合后的状态必须能够放进 JobManager 的内存中。 - -MemoryStateBackend 适用场景: - - - 本地开发和调试。 - - 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。 - -建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。 +### HashMapStateBackend -### FsStateBackend +在 *HashMapStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 -*FsStateBackend* 需要配置一个文件系统的 URL(类型、地址、路径),例如:"hdfs://namenode:40010/flink/checkpoints" 或 "file:///data/flink/checkpoints"。 +HashMapStateBackend 的适用场景: -FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。 -少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。 - -FsStateBackend 适用场景: - - - 状态比较大、窗口比较长、key/value 状态比较大的 Job。 - - 所有高可用的场景。 + - 有较大state,较长window和较大 key/value 状态的 jobs。 Review comment: ```suggestion - 有较大 state,较长 window 和较大 key/value 状态的 Job。 ``` ########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -165,9 +134,9 @@ env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints" </dependency> ``` -{{< hint info >}} -**注意:** 由于 RocksDB 是 Flink 默认分发包的一部分,所以如果你没在代码中使用 RocksDB,则不需要添加此依赖。而且可以在 `flink-conf.yaml` 文件中通过 `state.backend` 配置 State Backend,以及更多的 [checkpointing]({{< ref "docs/deployment/config" >}}#checkpointing) 和 [RocksDB 特定的]({{< ref "docs/deployment/config" >}}#rocksdb-state-backend) 参数。 -{{< /hint >}} +<div class="alert alert-info" markdown="span"> Review comment: 这类格式信息就和英文文档保持一致吧,不需要额外的了? ########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -256,9 +225,9 @@ Flink还提供了两个参数来控制*写路径*(MemTable)和*读路径*( 您可以选择使用 Flink 的监控指标系统来汇报 RocksDB 的原生指标,并且可以选择性的指定特定指标进行汇报。 请参阅 [configuration docs]({{< ref "docs/deployment/config" >}}#rocksdb-native-metrics) 了解更多详情。 -{{< hint warning >}} -**注意:** 启用 RocksDB 的原生指标可能会对应用程序的性能产生负面影响。 -{{< /hint >}} +<div class="alert alert-warning"> Review comment: 同上 ########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -42,92 +42,61 @@ under the License. Flink 内置了以下这些开箱即用的 state backends : - - *MemoryStateBackend* - - *FsStateBackend* - - *RocksDBStateBackend* + - *HashMapStateBackend* + - *EmbeddedRocksDBStateBackend* -如果不设置,默认使用 MemoryStateBackend。 +如果不设置,默认使用 HashMapStateBackend -### MemoryStateBackend - -在 *MemoryStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 - -在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。 - -MemoryStateBackend 的限制: - - - 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。 - - 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于 akka frame 大小(看[配置参数]({{< ref "docs/deployment/config" >}}))。 - - 聚合后的状态必须能够放进 JobManager 的内存中。 - -MemoryStateBackend 适用场景: - - - 本地开发和调试。 - - 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。 - -建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。 +### HashMapStateBackend -### FsStateBackend +在 *HashMapStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 Review comment: 这个后半句不是特别的通顺,之前的翻译也不理想。这样如何: ```suggestion 在 *HashMapStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key-value 形式的状态和窗口算子会持有一个 hash table,其中存储着状态值、触发器。 ``` ########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -42,92 +42,61 @@ under the License. Flink 内置了以下这些开箱即用的 state backends : - - *MemoryStateBackend* - - *FsStateBackend* - - *RocksDBStateBackend* + - *HashMapStateBackend* + - *EmbeddedRocksDBStateBackend* -如果不设置,默认使用 MemoryStateBackend。 +如果不设置,默认使用 HashMapStateBackend Review comment: ```suggestion 如果不设置,默认使用 HashMapStateBackend。 ``` ########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -42,92 +42,61 @@ under the License. Flink 内置了以下这些开箱即用的 state backends : - - *MemoryStateBackend* - - *FsStateBackend* - - *RocksDBStateBackend* + - *HashMapStateBackend* + - *EmbeddedRocksDBStateBackend* -如果不设置,默认使用 MemoryStateBackend。 +如果不设置,默认使用 HashMapStateBackend -### MemoryStateBackend - -在 *MemoryStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 - -在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。 - -MemoryStateBackend 的限制: - - - 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。 - - 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于 akka frame 大小(看[配置参数]({{< ref "docs/deployment/config" >}}))。 - - 聚合后的状态必须能够放进 JobManager 的内存中。 - -MemoryStateBackend 适用场景: - - - 本地开发和调试。 - - 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。 - -建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。 +### HashMapStateBackend -### FsStateBackend +在 *HashMapStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 -*FsStateBackend* 需要配置一个文件系统的 URL(类型、地址、路径),例如:"hdfs://namenode:40010/flink/checkpoints" 或 "file:///data/flink/checkpoints"。 +HashMapStateBackend 的适用场景: -FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。 -少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。 - -FsStateBackend 适用场景: - - - 状态比较大、窗口比较长、key/value 状态比较大的 Job。 - - 所有高可用的场景。 + - 有较大state,较长window和较大 key/value 状态的 jobs。 + - 所有的高可用场景。 建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。 -<a name="the-rocksdbstatebackend" /> - -### RocksDBStateBackend - -*RocksDBStateBackend* 需要配置一个文件系统的 URL (类型、地址、路径),例如:"hdfs://namenode:40010/flink/checkpoints" 或 "file:///data/flink/checkpoints"。 +### EmbeddedRocksDBStateBackend -RocksDBStateBackend 将正在运行中的状态数据保存在 [RocksDB](http://rocksdb.org) 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 -Unlike storing java objects in `HashMapStateBackend`, data is stored as serialized byte arrays, which are mainly defined by the type serializer, resulting in key comparisons being byte-wise instead of using Java's `hashCode()` and `equals()` methods. +EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 [RocksDB](http://rocksdb.org) 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 +不同于 `HashMapStateBackend` 中的 java objects,数据被以 serialized byte array 的方式存储,这种方式由 type serializer 决定,因此 key 之间的比较是以 byte-wise的形式进行而不是使用 Java 的 `hashCode` 或 `equals()` 方法。 Review comment: 这几个单词还是尽量翻译一下吧,比如 ```suggestion 不同于 `HashMapStateBackend` 中的 java 对象,数据被以序列化字节数组的方式存储,这种方式由序列化器决定,因此 key 之间的比较是以字节序的形式进行而不是使用 Java 的 `hashCode` 或 `equals()` 方法。 ``` ########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -332,17 +301,18 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory { {{< top >}} -## Migrating from Legacy Backends +## 自旧版本迁移 + +从 **Flink 1.13** 版本开始,社区改进了 state backend 的公开类,进而帮助用户更好理解本地状态存储和 checkpoint 存储的区分。 +这个变化并不会影响 state backend 和 checkpointing 过程的运行时实现和机制,仅仅是为了更好地传达设计意图。 +用户可以在没有任何 state 或者 consistency 失效的情况下使用新的 API。 Review comment: ```suggestion 用户可以将现有作业迁移到新的API,同时不会损失原有 state。 ``` ########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -42,92 +42,61 @@ under the License. Flink 内置了以下这些开箱即用的 state backends : - - *MemoryStateBackend* - - *FsStateBackend* - - *RocksDBStateBackend* + - *HashMapStateBackend* + - *EmbeddedRocksDBStateBackend* -如果不设置,默认使用 MemoryStateBackend。 +如果不设置,默认使用 HashMapStateBackend -### MemoryStateBackend - -在 *MemoryStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 - -在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。 - -MemoryStateBackend 的限制: - - - 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。 - - 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于 akka frame 大小(看[配置参数]({{< ref "docs/deployment/config" >}}))。 - - 聚合后的状态必须能够放进 JobManager 的内存中。 - -MemoryStateBackend 适用场景: - - - 本地开发和调试。 - - 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。 - -建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。 +### HashMapStateBackend -### FsStateBackend +在 *HashMapStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 -*FsStateBackend* 需要配置一个文件系统的 URL(类型、地址、路径),例如:"hdfs://namenode:40010/flink/checkpoints" 或 "file:///data/flink/checkpoints"。 +HashMapStateBackend 的适用场景: -FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。 -少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。 - -FsStateBackend 适用场景: - - - 状态比较大、窗口比较长、key/value 状态比较大的 Job。 - - 所有高可用的场景。 + - 有较大state,较长window和较大 key/value 状态的 jobs。 + - 所有的高可用场景。 建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。 -<a name="the-rocksdbstatebackend" /> - -### RocksDBStateBackend - -*RocksDBStateBackend* 需要配置一个文件系统的 URL (类型、地址、路径),例如:"hdfs://namenode:40010/flink/checkpoints" 或 "file:///data/flink/checkpoints"。 +### EmbeddedRocksDBStateBackend -RocksDBStateBackend 将正在运行中的状态数据保存在 [RocksDB](http://rocksdb.org) 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 -Unlike storing java objects in `HashMapStateBackend`, data is stored as serialized byte arrays, which are mainly defined by the type serializer, resulting in key comparisons being byte-wise instead of using Java's `hashCode()` and `equals()` methods. +EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 [RocksDB](http://rocksdb.org) 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 +不同于 `HashMapStateBackend` 中的 java objects,数据被以 serialized byte array 的方式存储,这种方式由 type serializer 决定,因此 key 之间的比较是以 byte-wise的形式进行而不是使用 Java 的 `hashCode` 或 `equals()` 方法。 -CheckPoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。 -少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 CheckPoint 的元数据文件中)。 +EmbeddedRocksDBStateBackend 会使用异步的方式生成 snapshots。 -RocksDBStateBackend 的限制: +EmbeddedRocksDBStateBackend 会使用异步的方式生成的限制: Review comment: ```suggestion EmbeddedRocksDBStateBackend 的局限: ``` ########## File path: docs/content.zh/docs/ops/state/state_backends.md ########## @@ -42,92 +42,61 @@ under the License. Flink 内置了以下这些开箱即用的 state backends : - - *MemoryStateBackend* - - *FsStateBackend* - - *RocksDBStateBackend* + - *HashMapStateBackend* + - *EmbeddedRocksDBStateBackend* -如果不设置,默认使用 MemoryStateBackend。 +如果不设置,默认使用 HashMapStateBackend -### MemoryStateBackend - -在 *MemoryStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 - -在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。 - -MemoryStateBackend 的限制: - - - 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。 - - 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于 akka frame 大小(看[配置参数]({{< ref "docs/deployment/config" >}}))。 - - 聚合后的状态必须能够放进 JobManager 的内存中。 - -MemoryStateBackend 适用场景: - - - 本地开发和调试。 - - 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。 - -建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。 +### HashMapStateBackend -### FsStateBackend +在 *HashMapStateBackend* 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。 -*FsStateBackend* 需要配置一个文件系统的 URL(类型、地址、路径),例如:"hdfs://namenode:40010/flink/checkpoints" 或 "file:///data/flink/checkpoints"。 +HashMapStateBackend 的适用场景: -FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。 -少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。 - -FsStateBackend 适用场景: - - - 状态比较大、窗口比较长、key/value 状态比较大的 Job。 - - 所有高可用的场景。 + - 有较大state,较长window和较大 key/value 状态的 jobs。 + - 所有的高可用场景。 建议同时将 [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm" >}}#managed-memory) 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。 -<a name="the-rocksdbstatebackend" /> - -### RocksDBStateBackend - -*RocksDBStateBackend* 需要配置一个文件系统的 URL (类型、地址、路径),例如:"hdfs://namenode:40010/flink/checkpoints" 或 "file:///data/flink/checkpoints"。 +### EmbeddedRocksDBStateBackend -RocksDBStateBackend 将正在运行中的状态数据保存在 [RocksDB](http://rocksdb.org) 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 -Unlike storing java objects in `HashMapStateBackend`, data is stored as serialized byte arrays, which are mainly defined by the type serializer, resulting in key comparisons being byte-wise instead of using Java's `hashCode()` and `equals()` methods. +EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 [RocksDB](http://rocksdb.org) 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 +不同于 `HashMapStateBackend` 中的 java objects,数据被以 serialized byte array 的方式存储,这种方式由 type serializer 决定,因此 key 之间的比较是以 byte-wise的形式进行而不是使用 Java 的 `hashCode` 或 `equals()` 方法。 -CheckPoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。 -少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 CheckPoint 的元数据文件中)。 +EmbeddedRocksDBStateBackend 会使用异步的方式生成 snapshots。 -RocksDBStateBackend 的限制: +EmbeddedRocksDBStateBackend 会使用异步的方式生成的限制: - 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 - 重要信息: RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。 + RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。 -RocksDBStateBackend 的适用场景: +EmbeddedRocksDBStateBackend 的适用场景: - 状态非常大、窗口非常长、key/value 状态非常大的 Job。 - 所有高可用的场景。 -注意,你可以保留的状态大小仅受磁盘空间的限制。与状态存储在内存中的 FsStateBackend 相比,RocksDBStateBackend 允许存储非常大的状态。 -然而,这也意味着使用 RocksDBStateBackend 将会使应用程序的最大吞吐量降低。 +注意,你可以保留的状态大小仅受磁盘空间的限制。与状态存储在内存中的 HashMapStateBackend 相比,EmbeddedRocksDBStateBackend 允许存储非常大的状态。 +然而,这也意味着使用 EmbeddedRocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。 -请同时参考 [Task Executor 内存配置]({{< ref "docs/deployment/memory/mem_tuning" >}}#rocksdb-state-backend) 中关于 RocksDBStateBackend 的建议。 +请同时参考 [Task Executor 内存配置]({{< ref "docs/deployment/memory/mem_tuning" >}}#rocksdb-state-backend) 中关于 EmbeddedRocksDBStateBackend 的建议。 -RocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (见 [这里]({{< ref "docs/ops/state/large_state_tuning" >}}))。 +EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (见 [这里]({{< ref "docs/ops/state/large_state_tuning" >}}))。 可以使用一些 RocksDB 的本地指标(metrics),但默认是关闭的。你能在 [这里]({{< ref "docs/deployment/config" >}}#rocksdb-native-metrics) 找到关于 RocksDB 本地指标的文档。 -The total memory amount of RocksDB instance(s) per slot can also be bounded, please refer to documentation [here]({{< ref "docs/ops/state/large_state_tuning" >}}#bounding-rocksdb-memory-usage) for details. +RocksDB instance 中每个 slot 的内存大小是有限制的,请参考 [这里]({{< ref "docs/ops/state/large_state_tuning" >}})。 Review comment: ```suggestion 每个 slot 中的 RocksDB instance 的内存大小是有限制的,详情请见 [这里]({{< ref "docs/ops/state/large_state_tuning" >}})。 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org