Thanks for your comments Baodi

> This proposal looks good to me.
>
> > Only Functions returning a Record will be authorized to ensure that the
> Function sets the Schema explicitly.
>
> Does this mean that the function return type is fixed Record?


Yes


> Can the interface declaration of the function be displayed in the API
> changes?
>
>
Can you explain more what you mean ?
This PIP doesn't change the API of a Function and it's already possible to
write a Function<?, Record<?>>.
And when declaring a Sink with a Function we'll check that it's the case.



>
> Thanks,
> Baodi Shi
>
> > On Jul 22, 2022, at 17:4828, Christophe Bornet <bornet.ch...@gmail.com>
> wrote:
> >
> > Dear Pulsar dev community,
> >
> > I would like to open a discussion here about PIP 193 : Sink preprocessing
> > Function <https://github.com/apache/pulsar/issues/16739>.
> >
> > Best regards
> >
> > Christophe
> >
> > ## Motivation
> >
> > Pulsar IO connectors make it possible to connect Pulsar to an external
> > system:
> > * A Source reads continuously from an external system and writes to a
> > Pulsar topic
> > * A Sink reads continuously from a Pulsar topic and writes to an external
> > system.
> > Sources and Sinks are written in Java.
> >
> > Pulsar also has a lightweight computing system named Pulsar Functions. A
> > Pulsar Function reads from one or more topics, applies user logic written
> > in Java, Python or Go and writes to an output topic.
> >
> > When using Pulsar IO connectors, the format of what is read/written
> from/to
> > the source/sink is defined by the connector code. But there are a lot of
> > situations where a user wants to transform this data before using it.
> > Currently the solution is to either :
> > * write a custom connector that transforms the data the way we want but
> > that means writing a lot of code without reuse, packaging and managing
> > custom connectors and so on..
> > * write a Function to transform the data after it was written to a topic
> by
> > a Source or before it is read from a topic by a Sink. This is not very
> > efficient as we have to use an intermediate topic, which means additional
> > storage, IO, and latency.
> >
> > Considering all this, it would be handy to be able to apply a Function
> > on-the-fly to a connector without going through an intermediary topic.
> >
> > ## Goal
> >
> > This PIP defines the changes needed to be able to apply a preprocessing
> > Function on-the-fly to a Sink.
> > The preprocessing function can be a built-in function, a package
> function,
> > or loaded through an http URL or a file path.
> > Sources, Sinks and Functions are based on the same runtime process that:
> > * reads from a Source. For Sinks and Functions this Source is a
> > PulsarSource consuming from a Pulsar topic
> > * applies a Function. For Sources and Sinks, this Function is
> > IdentityFunction which returns the data it gets without modification.
> > * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
> > writing to a Pulsar topic.
> >
> > This PIP reuses this and allows configuring a Function different from
> > IdentityFunction to Sinks.
> > Only Functions returning a Record will be authorized to ensure that the
> > Function sets the Schema explicitly.
> >
> > Out of the scope of this PIP, for future work:
> > * Applying a post-processing Function to a Source
> > * Loading the Function jar through the Sink CLI
> >
> > ## API Changes
> >
> > ### Admin CLI
> >
> > The following options will be added to the `pulsar-admin sinks` CLI
> > `create`, `update` and `localrun`:
> > * `preprocess-function`: the preprocess function applied before the Sink.
> > Starts by `builtin://` for built-in functions, `function://` for package
> > function, `http://` or `file://`
> > * `preprocess-function-classname`: the preprocess function class name
> > (optional if the function is a NAR)
> > * `preprocess-function-config`: the configuration of the preprocess
> > function in the same format as the `user-config` parameter of the
> > `functions create` CLI command.
> >
> > The corresponding fields will be added to `SinkConfig`:
> >
> > ```java
> >    private String preprocessFunction;
> >    private String preprocessFunctionClassName;
> >    private String preprocessFunctionConfig;
> > ```
> >
> > ### Function definition
> >
> > The field `extraFunctionPackageLocation` to the protobuf structure
> > `FunctionMetaData` will be added. This field will be filled with the
> > location of the extra function to apply when registering a sink and used
> in
> > the Runtime to load the function code.
> >
> > ```protobuf
> > message FunctionMetaData {
> >    ...
> >    PackageLocationMetaData extraFunctionPackageLocation = 7;
> > }
> > ```
> >
> > ### Runtime
> >
> > The parameters `extraFunctionFile` and `originalExtraFunctionFileName`
> will
> > be added to `RuntimeFactory::createContainer`
> >
> >
> > ```java
> >   Runtime createContainer(
> >            InstanceConfig instanceConfig, String codeFile, String
> > originalCodeFileName,
> >            String extraFunctionFile, String
> originalExtraFunctionFileName,
> >            Long expectedHealthCheckInterval) throws Exception;
> > ```
> >
> > ### Instance function cache
> >
> > A field `extraFunctionId` to `InstanceConfig` that will hold the UUID
> cache
> > key of the extra function will be added.
> >
> > ```java
> > public class InstanceConfig {
> >    private int instanceId;
> >    private String functionId;
> >    private String extraFunctionId;
> > ```
> >
> > ### JavaInstanceStarter
> >
> >
> > The following parameters will be added to JavaInstanceStarter:
> > * `--extra_function_jar`: the path to the extra function jar
> > * `--extra_function_id`: the extra function UUID cache key
> >
> > These parameters are then used by the `ThreadRuntime` to load the
> function
> > from the `FunctionCacheManager` or create it there if needed.
> >
> > ### Download the extra function
> >
> > The statefulset spawned in `KubernetesRuntime` needs to be able to
> download
> > the extra functions code via API.
> > An `extra-function` query param will be added to the download function
> HTTP
> > endpoint
> >
> > ```java
> >   @Path("/{tenant}/{namespace}/{functionName}/download")
> >    public StreamingOutput downloadFunction(
> >            @ApiParam(value = "The tenant of functions")
> >            final @PathParam("tenant") String tenant,
> >            @ApiParam(value = "The namespace of functions")
> >            final @PathParam("namespace") String namespace,
> >            @ApiParam(value = "The name of functions")
> >            final @PathParam("functionName") String functionName) {
> >            final @PathParam("functionName") String functionName,
> >            @ApiParam(value = "Whether to download the extra-function")
> >            final @QueryParam("extra-function") boolean extraFunction) {
> > ```
> >
> > If `extraFunction` is `true` then the extra function will be returned
> > instead of the sink.
> >
> > The Java admin SDK will have the following methods added:
> >
> >
> > ```java
> >   /**
> >     * Download Function Code.
> >     *
> >     * @param destinationFile
> >     *           file where data should be downloaded to
> >     * @param tenant
> >     *            Tenant name
> >     * @param namespace
> >     *            Namespace name
> >     * @param function
> >     *            Function name
> >     * @param extraFunction
> >     *            Whether to download the extra-function (for sources and
> > sinks)
> >     * @throws PulsarAdminException
> >     */
> >    void downloadFunction(String destinationFile, String tenant, String
> > namespace, String function,
> >                          boolean extraFunction) throws
> > PulsarAdminException;
> >
> >    /**
> >     * Download Function Code asynchronously.
> >     *
> >     * @param destinationFile
> >     *           file where data should be downloaded to
> >     * @param tenant
> >     *            Tenant name
> >     * @param namespace
> >     *            Namespace name
> >     * @param function
> >     *            Function name
> >     * @param extraFunction
> >     *            Whether to download the extra-function (for sources and
> > sinks)
> >     */
> >    CompletableFuture<Void> downloadFunctionAsync(
> >            String destinationFile, String tenant, String namespace,
> String
> > function, boolean extraFunction);
> > ```
> >
> > The parameter `--extra-function` will be added to the admin CLI command
> > `functions download`
> >
> > ## Implementation
> >
> > ### Pulsar-admin
> >
> > * Add the admin CLI options when creating/updating/localrunning the sink
> > (see API changes)
> >
> > ### Pulsar broker
> >
> > * On the broker API, in registerSink/updateSink, if a preprocessing
> > function is present in the Sink config, we:
> >  * validate the function
> >    * get the function classloader (from builtin or download a package
> file)
> >    * load the function
> >    * inspect the function types and set the first arg as Sink type. Also
> > verify that the second arg is of type Record.
> >  * use the function classloader instead of the sink classloader to verify
> > if custom schemas, serdes, crypto key readers can be loaded and are
> conform.
> >  * get the function package location and fill the protobuf
> > extraFunctionPackageLocation field with it. A name for this preprocessing
> > function is generated from the sink name so it can be referenced when
> > stored in BookKeeper or in package management. The name of the
> > preprocessing function is `{sink name}__sink-function`.
> >  * set the `functionDetails` with the preprocessing function config
> > (function class name and function userConfig)
> >
> > * The `--extra-function` query parameter is added to the `functions
> > download` CLI command, admin SDK and HTTP API (see API changes).
> >
> > ### Function worker
> >
> > * When the `InstanceConfig` is created, an UUID is set to the
> > `extraFunctionId` field. This field will serve as a cache key for the
> extra
> > function (see API changes).
> > * When the `FunctionActioner` starts the function, if
> > `extraFunctionPackageLocation` is present, the same is done for the extra
> > function as what is done for the connector:
> >  * if the runtime is not externally managed, the extra function code is
> > downloaded from the `extraFunctionPackageLocation` and the `Runtime` is
> > created with the extra package file path and original name (see API
> changes
> > to `RuntimeFactory::createContainer`)
> >  * if the runtime is externally managed, the `Runtime` is created with
> the
> > `extraFunctionPackageLocation` and original name.
> >
> > * Depending on the configured runtime, if there’s an extra function file:
> >  * For the `ThreadRuntime`, the extra function classloader is obtained
> > with the instance `extraFunctionId` cache key, then this classloader is
> > passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` then
> > switches between the connector classloader and the extra function
> > classloader accordingly..
> >  * For the `ProcessRuntime`, the path to the extra function jar is added
> > to the `--extra_function_jar` parameter in the `JavaInstanceStarter`
> > command. The `JavaInstanceStarter` then uses it when creating its
> > `ThreadRuntime`.
> >  * For the `KubernetesRuntime`, a command is added in the statefulset
> exec
> > command to download the extra function using the `–extra-function` flag
> of
> > the `functions download` command. And the path to this downloaded  jar is
> > added to the `--extra_function_jar` parameter of the
> `JavaInstanceStarter`
> > command.
> >
> > ### LocalRunner
> >
> > If  `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will use
> the
> > same methods as in the broker to get the function file and
> > `functionDetails` and use them when spawning the `Runtime`.
> >
> > ## Reject Alternatives
> >
> > N/A
>
>

Reply via email to