Thanks for your comments Baodi
> This proposal looks good to me. > > > Only Functions returning a Record will be authorized to ensure that the > Function sets the Schema explicitly. > > Does this mean that the function return type is fixed Record? Yes > Can the interface declaration of the function be displayed in the API > changes? > > Can you explain more what you mean ? This PIP doesn't change the API of a Function and it's already possible to write a Function<?, Record<?>>. And when declaring a Sink with a Function we'll check that it's the case. > > Thanks, > Baodi Shi > > > On Jul 22, 2022, at 17:4828, Christophe Bornet <bornet.ch...@gmail.com> > wrote: > > > > Dear Pulsar dev community, > > > > I would like to open a discussion here about PIP 193 : Sink preprocessing > > Function <https://github.com/apache/pulsar/issues/16739>. > > > > Best regards > > > > Christophe > > > > ## Motivation > > > > Pulsar IO connectors make it possible to connect Pulsar to an external > > system: > > * A Source reads continuously from an external system and writes to a > > Pulsar topic > > * A Sink reads continuously from a Pulsar topic and writes to an external > > system. > > Sources and Sinks are written in Java. > > > > Pulsar also has a lightweight computing system named Pulsar Functions. A > > Pulsar Function reads from one or more topics, applies user logic written > > in Java, Python or Go and writes to an output topic. > > > > When using Pulsar IO connectors, the format of what is read/written > from/to > > the source/sink is defined by the connector code. But there are a lot of > > situations where a user wants to transform this data before using it. > > Currently the solution is to either : > > * write a custom connector that transforms the data the way we want but > > that means writing a lot of code without reuse, packaging and managing > > custom connectors and so on.. > > * write a Function to transform the data after it was written to a topic > by > > a Source or before it is read from a topic by a Sink. This is not very > > efficient as we have to use an intermediate topic, which means additional > > storage, IO, and latency. > > > > Considering all this, it would be handy to be able to apply a Function > > on-the-fly to a connector without going through an intermediary topic. > > > > ## Goal > > > > This PIP defines the changes needed to be able to apply a preprocessing > > Function on-the-fly to a Sink. > > The preprocessing function can be a built-in function, a package > function, > > or loaded through an http URL or a file path. > > Sources, Sinks and Functions are based on the same runtime process that: > > * reads from a Source. For Sinks and Functions this Source is a > > PulsarSource consuming from a Pulsar topic > > * applies a Function. For Sources and Sinks, this Function is > > IdentityFunction which returns the data it gets without modification. > > * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink > > writing to a Pulsar topic. > > > > This PIP reuses this and allows configuring a Function different from > > IdentityFunction to Sinks. > > Only Functions returning a Record will be authorized to ensure that the > > Function sets the Schema explicitly. > > > > Out of the scope of this PIP, for future work: > > * Applying a post-processing Function to a Source > > * Loading the Function jar through the Sink CLI > > > > ## API Changes > > > > ### Admin CLI > > > > The following options will be added to the `pulsar-admin sinks` CLI > > `create`, `update` and `localrun`: > > * `preprocess-function`: the preprocess function applied before the Sink. > > Starts by `builtin://` for built-in functions, `function://` for package > > function, `http://` or `file://` > > * `preprocess-function-classname`: the preprocess function class name > > (optional if the function is a NAR) > > * `preprocess-function-config`: the configuration of the preprocess > > function in the same format as the `user-config` parameter of the > > `functions create` CLI command. > > > > The corresponding fields will be added to `SinkConfig`: > > > > ```java > > private String preprocessFunction; > > private String preprocessFunctionClassName; > > private String preprocessFunctionConfig; > > ``` > > > > ### Function definition > > > > The field `extraFunctionPackageLocation` to the protobuf structure > > `FunctionMetaData` will be added. This field will be filled with the > > location of the extra function to apply when registering a sink and used > in > > the Runtime to load the function code. > > > > ```protobuf > > message FunctionMetaData { > > ... > > PackageLocationMetaData extraFunctionPackageLocation = 7; > > } > > ``` > > > > ### Runtime > > > > The parameters `extraFunctionFile` and `originalExtraFunctionFileName` > will > > be added to `RuntimeFactory::createContainer` > > > > > > ```java > > Runtime createContainer( > > InstanceConfig instanceConfig, String codeFile, String > > originalCodeFileName, > > String extraFunctionFile, String > originalExtraFunctionFileName, > > Long expectedHealthCheckInterval) throws Exception; > > ``` > > > > ### Instance function cache > > > > A field `extraFunctionId` to `InstanceConfig` that will hold the UUID > cache > > key of the extra function will be added. > > > > ```java > > public class InstanceConfig { > > private int instanceId; > > private String functionId; > > private String extraFunctionId; > > ``` > > > > ### JavaInstanceStarter > > > > > > The following parameters will be added to JavaInstanceStarter: > > * `--extra_function_jar`: the path to the extra function jar > > * `--extra_function_id`: the extra function UUID cache key > > > > These parameters are then used by the `ThreadRuntime` to load the > function > > from the `FunctionCacheManager` or create it there if needed. > > > > ### Download the extra function > > > > The statefulset spawned in `KubernetesRuntime` needs to be able to > download > > the extra functions code via API. > > An `extra-function` query param will be added to the download function > HTTP > > endpoint > > > > ```java > > @Path("/{tenant}/{namespace}/{functionName}/download") > > public StreamingOutput downloadFunction( > > @ApiParam(value = "The tenant of functions") > > final @PathParam("tenant") String tenant, > > @ApiParam(value = "The namespace of functions") > > final @PathParam("namespace") String namespace, > > @ApiParam(value = "The name of functions") > > final @PathParam("functionName") String functionName) { > > final @PathParam("functionName") String functionName, > > @ApiParam(value = "Whether to download the extra-function") > > final @QueryParam("extra-function") boolean extraFunction) { > > ``` > > > > If `extraFunction` is `true` then the extra function will be returned > > instead of the sink. > > > > The Java admin SDK will have the following methods added: > > > > > > ```java > > /** > > * Download Function Code. > > * > > * @param destinationFile > > * file where data should be downloaded to > > * @param tenant > > * Tenant name > > * @param namespace > > * Namespace name > > * @param function > > * Function name > > * @param extraFunction > > * Whether to download the extra-function (for sources and > > sinks) > > * @throws PulsarAdminException > > */ > > void downloadFunction(String destinationFile, String tenant, String > > namespace, String function, > > boolean extraFunction) throws > > PulsarAdminException; > > > > /** > > * Download Function Code asynchronously. > > * > > * @param destinationFile > > * file where data should be downloaded to > > * @param tenant > > * Tenant name > > * @param namespace > > * Namespace name > > * @param function > > * Function name > > * @param extraFunction > > * Whether to download the extra-function (for sources and > > sinks) > > */ > > CompletableFuture<Void> downloadFunctionAsync( > > String destinationFile, String tenant, String namespace, > String > > function, boolean extraFunction); > > ``` > > > > The parameter `--extra-function` will be added to the admin CLI command > > `functions download` > > > > ## Implementation > > > > ### Pulsar-admin > > > > * Add the admin CLI options when creating/updating/localrunning the sink > > (see API changes) > > > > ### Pulsar broker > > > > * On the broker API, in registerSink/updateSink, if a preprocessing > > function is present in the Sink config, we: > > * validate the function > > * get the function classloader (from builtin or download a package > file) > > * load the function > > * inspect the function types and set the first arg as Sink type. Also > > verify that the second arg is of type Record. > > * use the function classloader instead of the sink classloader to verify > > if custom schemas, serdes, crypto key readers can be loaded and are > conform. > > * get the function package location and fill the protobuf > > extraFunctionPackageLocation field with it. A name for this preprocessing > > function is generated from the sink name so it can be referenced when > > stored in BookKeeper or in package management. The name of the > > preprocessing function is `{sink name}__sink-function`. > > * set the `functionDetails` with the preprocessing function config > > (function class name and function userConfig) > > > > * The `--extra-function` query parameter is added to the `functions > > download` CLI command, admin SDK and HTTP API (see API changes). > > > > ### Function worker > > > > * When the `InstanceConfig` is created, an UUID is set to the > > `extraFunctionId` field. This field will serve as a cache key for the > extra > > function (see API changes). > > * When the `FunctionActioner` starts the function, if > > `extraFunctionPackageLocation` is present, the same is done for the extra > > function as what is done for the connector: > > * if the runtime is not externally managed, the extra function code is > > downloaded from the `extraFunctionPackageLocation` and the `Runtime` is > > created with the extra package file path and original name (see API > changes > > to `RuntimeFactory::createContainer`) > > * if the runtime is externally managed, the `Runtime` is created with > the > > `extraFunctionPackageLocation` and original name. > > > > * Depending on the configured runtime, if there’s an extra function file: > > * For the `ThreadRuntime`, the extra function classloader is obtained > > with the instance `extraFunctionId` cache key, then this classloader is > > passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` then > > switches between the connector classloader and the extra function > > classloader accordingly.. > > * For the `ProcessRuntime`, the path to the extra function jar is added > > to the `--extra_function_jar` parameter in the `JavaInstanceStarter` > > command. The `JavaInstanceStarter` then uses it when creating its > > `ThreadRuntime`. > > * For the `KubernetesRuntime`, a command is added in the statefulset > exec > > command to download the extra function using the `–extra-function` flag > of > > the `functions download` command. And the path to this downloaded jar is > > added to the `--extra_function_jar` parameter of the > `JavaInstanceStarter` > > command. > > > > ### LocalRunner > > > > If `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will use > the > > same methods as in the broker to get the function file and > > `functionDetails` and use them when spawning the `Runtime`. > > > > ## Reject Alternatives > > > > N/A > >