hackergin commented on code in PR #23130: URL: https://github.com/apache/flink/pull/23130#discussion_r1290479488
########## docs/content/docs/dev/table/procedures.md: ########## @@ -0,0 +1,544 @@ +--- +title: "Procedures" +is_beta: true +weight: 50 +type: docs +aliases: + - /dev/table/procedures.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Procedures + +Flink Table API & SQL empowers users to perform data manipulation or administrative tasks with procedures. Procedures can run FLINK jobs with `StreamExecutionEnvironment` provided, which make procedures more powerful and flexible. + +## Implementation Guide + +To call a procedure, the procedure should be provided by a catalog. To provide procedures in a catalog, you must implement a procedure and then return the procedure +in method `Catalog.getProcedure(ObjectPath procedurePath)`. The following steps will guild how to implement a procedure and provide it in a catalog. + +### Procedure Class + +An implementation class must implement the interface `org.apache.flink.table.procedures.Procedure`. + +The class must be declared `public`, not `abstract`, and should be globally accessible. Thus, non-static inner or anonymous classes are not allowed. + +### Call Methods + +The interface doesn't provide any method,you have to define a method named `call` in which you can implement the logic of the procedure. +The methods must be declared `public` and take a well-defined set of arguments. + +Please note: + +* The first parameter of the method `call` should always be `ProcedureContext` which provides the method `getExecutionEnvironment` to get a `StreamExecutionEnvironment` to enable to run a Flink Job +* The return type should always be an array, like `int[]`, `String[]`, etc + +More detail can be found in the Java doc of the class `org.apache.flink.table.procedures.Procedure`. + +Regular JVM method calling semantics apply. Therefore, it is possible to: +- implement overloaded methods such as `call(ProcedureContext, Integer)` and `call(ProcedureContext, LocalDateTime)` +- use var-args such as `call(ProcedureContext, Integer...)` +- use object inheritance such as `call(ProcedureContext, Object)` that takes both `LocalDateTime` and `Integer` +- and combinations of the above such as `call(ProcedureContext, Object...)` that takes all kinds of arguments + +If you intend to implement procedures in Scala, please add the `scala.annotation.varargs` annotation in +case of variable arguments. Furthermore, it is recommended to use boxed primitives (e.g. `java.lang.Integer` +instead of `Int`) to support `NULL`. + +The following snippets shows an example of an overloaded procedure: + +{{< tabs "0819d780-3052-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} + +```java +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +// procedure with overloaded call methods +public class GenerateSequenceProcedure implements Procedure { + + public long[] call(ProcedureContext context, int n) { + return generate(context.getExecutionEnvironment(), n); + } + + public long[] call(ProcedureContext context, String n) { + return generate(context.getExecutionEnvironment(), Integer.parseInt(n)); + } + + private long[] generate(StreamExecutionEnvironment env, int n) throws Exception { + long[] sequenceN = new long[n]; + int i = 0; + try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) { + while (result.hasNext()) { + sequenceN[i++] = result.next(); + } + } + return sequenceN; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import scala.annotation.varargs + +// procedures with overloaded call methods +class GenerateSequenceProcedure extends Procedure { + + def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = { + Array(a + b) + } + + def call(context: ProcedureContext, a: String, b: String): Array[Integer] = { + Array(Integer.valueOf(a) + Integer.valueOf(b)) + } + + @varargs // generate var-args like Java + def eval(context: ProcedureContext, d: Double*): Array[Integer] = { + Array(d.sum.toInt) + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + +### Type Inference +The table ecosystem (similar to the SQL standard) is a strongly typed API. Therefore, both procedure parameters and return types must be mapped to a [data type]({{< ref "docs/dev/table/types" >}}). + +From a logical perspective, the planner needs information about expected types, precision, and scale. From a JVM perspective, the planner needs information about how internal data structures are represented as JVM objects when calling a procedure. + +The logic for validating input arguments and deriving data types for both the parameters and the result of a procedure is summarized under the term _type inference_. + +Flink's procedures implement an automatic type inference extraction that derives data types from the procedure's class and its `call` methods via reflection. If this implicit reflective extraction approach is not successful, the extraction process can be supported by annotating affected parameters, classes, or methods with `@DataTypeHint` and `@ProcedureHint`. More examples on how to annotate procedures are shown below. + +Note: although the return type in `call` method must be array type `T[]`, if use `@DataTypeHint` to annotate the return type , it's actually expected to annotate the component type of the array type, which is actually `T`. + +#### Automatic Type Inference + +The automatic type inference inspects the procedure's class and `call` methods to derive data types for the arguments and result of a procedure. `@DataTypeHint` and `@ProcedureHint` annotations support the automatic extraction. + +For a full list of classes that can be implicitly mapped to a data type, see the [data type extraction section]({{< ref "docs/dev/table/types" >}}#data-type-extraction). + +**`@DataTypeHint`** + +In many scenarios, it is required to support the automatic extraction _inline_ for parameters and return types of a procedure + +The following example shows how to use data type hints. More information can be found in the documentation of the annotation class. + +{{< tabs "9414057c-3051-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.InputGroup; +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.types.Row; + +// procedure with overloaded call methods +public static class OverloadedProcedure implements Procedure { + + // no hint required + public Long[] call(ProcedureContext context, long a, long b) { + return new Long[] {a + b}; + } + + // define the precision and scale of a decimal + public @DataTypeHint("DECIMAL(12, 3)") BigDecimal[] call(ProcedureContext context, double a, double b) { + return new BigDecimal[] {BigDecimal.valueOf(a + b)}; + } + + // define a nested data type + @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") + public Row[] call(ProcedureContext context, int i) { + return new Row[] {Row.of(String.valueOf(i), Instant.ofEpochSecond(i))}; + } + + // allow wildcard input and custom serialized output + @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class) + public ByteBuffer[] call(ProcedureContext context, @DataTypeHint(inputGroup = InputGroup.ANY) Object o) { + return new ByteBuffer[] {MyUtils.serializeToByteBuffer(o)}; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.InputGroup +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import org.apache.flink.types.Row + +// procedure with overloaded call methods +class OverloadedProcedure extends Procedure { + + // no hint required + def call(context: ProcedureContext, a: Long, b: Long): Array[Long] = { + Array(a + b) + } + + // define the precision and scale of a decimal + @DataTypeHint("DECIMAL(12, 3)") + def call(context: ProcedureContext, a: Double, b: Double): Array[BigDecimal] = { + Array(BigDecimal.valueOf(a + b)) + } + + // define a nested data type + @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") + def call(context: ProcedureContext, i: Integer): Array[Row] = { + Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i)) + } + + // allow wildcard input and custom serialized output + @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer]) + def call(context: ProcedureContext, @DataTypeHint(inputGroup = InputGroup.ANY) o: Object): Array[java.nio.ByteBuffer] = { + Array[MyUtils.serializeToByteBuffer(o)] + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + +**`@ProcedureHint`** + +In some scenarios, it is desirable that one `call` method handles multiple different data types at the same time. Furthermore, in some scenarios, overloaded `call` methods have a common result type that should be declared only once. + +The `@ProcedureHint` annotation can provide a mapping from argument data types to a result data type. It enables annotating entire procedure classes or `call` methods for input and result data types. One or more annotations can be declared on top of a class or individually for each `call` method for overloading procedure signatures. All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined on top of a procedure class are inherited by all `call` methods. + +The following example shows how to use procedure hints. More information can be found in the documentation of the annotation class. + +{{< tabs "16d50628-3052-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.types.Row; + +// procedure with overloaded call methods +// but globally defined output type +@ProcedureHint(output = @DataTypeHint("ROW<s STRING, i INT>")) +public static class OverloadedProcedure implements Procedure { + + public Row[] call(ProcedureContext context, int a, int b) { + return new Row[] {Row.of("Sum", a + b)}; + } + + // overloading of arguments is still possible + public Row[] call(ProcedureContext context) { + return new Row[] {Row.of("Empty args", -1)}; + } +} + +// decouples the type inference from call methods, +// the type inference is entirely determined by the procedure hints +@ProcedureHint( + input = {@DataTypeHint("INT"), @DataTypeHint("INT")}, + output = @DataTypeHint("INT") +) +@ProcedureHint( + input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")}, + output = @DataTypeHint("BIGINT") +) +@ProcedureHint( + input = {}, + output = @DataTypeHint("BOOLEAN") +) +public static class OverloadedProcedure implements Procedure { + + // an implementer just needs to make sure that a method exists + // that can be called by the JVM + public Object[] call(ProcedureContext context, Object... o) { + if (o.length == 0) { + return new Object[] {false}; + } + return new Object[] {o[0]}; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala + +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.ProcedureHint +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import org.apache.flink.types.Row +import scala.annotation.varargs + +// procedure with overloaded call methods +// but globally defined output type +@ProcedureHint(output = new DataTypeHint("ROW<s STRING, i INT>")) +class OverloadedFunction extends Procedure { + + def call(context: ProcedureContext, a: Int, b: Int): Array[Row] = { + Array(Row.of("Sum", Int.box(a + b))) + } + + // overloading of arguments is still possible + def call(context: ProcedureContext): Array[Row] = { + Array(Row.of("Empty args", Int.box(-1))) + } +} + +// decouples the type inference from evaluation methods, +// the type inference is entirely determined by the function hints +@ProcedureHint( + input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")), + output = new DataTypeHint("INT") +) +@ProcedureHint( + input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")), + output = new DataTypeHint("BIGINT") +) +@ProcedureHint( + input = Array(), + output = new DataTypeHint("BOOLEAN") +) +class OverloadedProcedure extends Procedure { + + // an implementer just needs to make sure that a method exists + // that can be called by the JVM + @varargs + def eval(context: ProcedureContext, o: AnyRef*): Array[AnyRef]= { + if (o.length == 0) { + Array(Boolean.box(false)) + } + Array(o(0)) + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + + +### Return Procedure in Catalog +After implement a procedure, the catalog can then return the procedure in method `Catalog.getProcedure(ObjectPath procedurePath)`. The following example shows how to return it in a catalog. Review Comment: ```suggestion After implementing a procedure, the catalog can then return the procedure in method `Catalog.getProcedure(ObjectPath procedurePath)`. The following example shows how to return it in a catalog. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org