hackergin commented on code in PR #23130: URL: https://github.com/apache/flink/pull/23130#discussion_r1294251979
########## docs/content.zh/docs/dev/table/procedures.md: ########## @@ -0,0 +1,541 @@ +--- +title: "存储过程" +is_beta: true +weight: 50 +type: docs +aliases: + - /zh/dev/table/procedures.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 存储过程 + + +Flink 允许用户在 Table API 和 SQL 中调用存储过程来完成一些特定任务,比如处理数据,数据管理类任务等。存储过程可以通过 `StreamExecutionEnvironment` 来运行 Flink 作业,这使得存储过程更加强大和灵活。 + +## 开发指南 +------------------ + +为了调用一个存储过程,需要确保一个 Catalog 可以提供这个存储过程。为了让一个 Catalog 提供存储过程,你首先需要实现一个存储过程,然后在方法 `Catalog.getProcedure(ObjectPath procedurePath)` 返回这个存储过程。 +下面的步骤将展示如何实现一个存储过程并让一个 Catalog 提供这个存储过程。 + +### 存储过程类 + +存储过程的实现类必须实现接口 `org.apache.flink.table.procedures.Procedure`。 + +该实现类必须声明为 `public`, 而不是 `abstract`, 并且可以被全局访问。不允许使用非静态内部类或匿名类。 + +### Call 方法 + +存储过程的接口不提供任何方法,存储过程的实现类必须有名为 `call` 的方法,在该方法里面可以实现存储过程实际的逻辑。`call` 方法必须被声明为 `public`, 并且带有一组定义明确的参数。 + +请注意: + +* `call` 方法的第一个参数总是应该为 `ProcedureContext`,该参数提供了方法 `getExecutionEnvironment()` 来得到当前的 `StreamExecutionEnvironment`。通过 `StreamExecutionEnvironment` 可以运行一个 Flink 作业; +* `call` 方法的返回类型应该永远都是一个数组类型,比如 `int[]`,`String[]`,等等; + +更多的细节请参考类 `org.apache.flink.table.procedures.Procedure` 的 Java 文档。 + +常规的 JVM 方法调用语义是适用的,因此可以: +- 实现重载的方法,例如 `call(ProcedureContext, Integer)` and `call(ProcedureContext, LocalDateTime)`; +- 使用变长参数,例如 `call(ProcedureContext, Integer...)`; +- 使用对象继承,例如 `call(ProcedureContext, Object)` 可接受 `LocalDateTime` 和 `Integer` 作为参数; +- 也可组合使用,例如 `call(ProcedureContext, Object...)` 可接受所有类型的参数; + +如果你希望用 Scala 来实现一个存储过程,对应可变长参数的情况,请添加 `scala.annotation.varargs`。另外,推荐使用装箱的基本类型(比如,使用 `java.lang.Integer` 而不是 `Int`)来支持 `NULL`。 + +下面的代码片段展示来一个重载存储过程的例子: + +{{< tabs "7c5a5392-30d7-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} + +```java +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +// 有多个重载 call 方法的存储过程 +public class GenerateSequenceProcedure implements Procedure { + + public long[] call(ProcedureContext context, int n) { + return generate(context.getExecutionEnvironment(), n); + } + + public long[] call(ProcedureContext context, String n) { + return generate(context.getExecutionEnvironment(), Integer.parseInt(n)); + } + + private long[] generate(StreamExecutionEnvironment env, int n) throws Exception { + long[] sequenceN = new long[n]; + int i = 0; + try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) { + while (result.hasNext()) { + sequenceN[i++] = result.next(); + } + } + return sequenceN; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import scala.annotation.varargs + +// 有多个重载 call 方法的存储过程 +class GenerateSequenceProcedure extends Procedure { Review Comment: ```suggestion class GenerateSequenceProcedure implements Procedure { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org