Thanks for the feedback Asaf
> > - preprocess-function: the preprocess function applied before the > > Sink. Starts by builtin:// for built-in functions, function:// for > > package function, http:// or file:// > > > > 1. While this function is applied only before sink? I thought it replaces > the identity function, so why a source can't have a function that reads > from the source (say S3), runs the function and only then writes to a > pulsar topic? > Yes that's totally possible to implement and will be done in future work like written in the PIP. > 2. Can you clarify more about built in and function for package function? > Is this an existing functionality ? > Yes those are existing functionalities. Built-in functions are not documented (and we should do something about that). Package management of functions is described in https://pulsar.apache.org/docs/functions-deploy#use-package-management-service > 3. Regarding http - Are you loading a class through that URL? Aren't we > exposed to same problem Log4Shell security issue had? If so, what measures > are you taking to protect ? > Yes we are loading code via URL. This feature already exists for Sources/Sinks/Functions. I guess you need to have a huge trust of the source from where you download. This PIP has the same security level as what already exists for this functionality. > > The field extraFunctionPackageLocation to the protobuf structure > > FunctionMetaData will be added. This field will be filled with the > > location of the extra function to apply when registering a sink and used > in > > the Runtime to load the function code. > > Can you please expand on that? You mean the JAR location, which you will > search that class name and function specified in the 3 fields you've added > to the config? > Not exactly. It's the location of where the JAR is stored. It can be BookKeeper, package management, built-in NAR, etc... In KubernetesRuntime, there are cases where the builtin or package function you provide in the preprocess-function param could be copied to BK. That's the same as for a regular Sink/Source and if we need to copy to BK, we append `__sink-function` to the storage path to prevent conflict with the sink code. The class name is indeed looked up in this JAR. > The parameters extraFunctionFile and originalExtraFunctionFileName will be > > added to RuntimeFactory::createContainer > > 1. File and fileName containing what? How does this related to > extraFunctionPackageLocation? > That part mimicks what is already done for the main code of the Source/Sink code (with respectively codeFile, originalCodeFileName and packageLocation) Before starting the ThreadedRuntime, we download locally the JAR from the extraFunctionPackageLocation in the extraFunctionFile so we can load the code from it. > > In here you use the terminology Extra Function" and in fields of config and > admin you used the term Pre-Process Function. I would stick to Pro-Process > Function and stick with it all over. > This terminology need to be applicable to a Function that would be applied after a Source so we can't use "preprocess" here. I haven't found better than "extra function". Don't hesitate to propose something ! > > > > The following parameters will be added to JavaInstanceStarter: > > > > - --extra_function_jar: the path to the extra function jar > > > > > > - --extra_function_id: the extra function UUID cache key > > > > These parameters are then used by the ThreadRuntime to load the function > > from the FunctionCacheManager or create it there if needed. > > > Can you elaborate on that? JavaInstanceStarter is used to start a single > Function? It's used from command line? The JavaInstanceStarter is indeed a CLI to start a JavaInstance. The JavaInstance is the process that will execute the code to read from a Source, execute a Function and write to a Sink. Generally Pulsar users don't use the JavaInstanceStarter directly. The command line is forged by the ProcessRuntime and KubernetesRuntime. > > > In general, you will essentially have two class loaders - one for the sink > and one for the pre-process function? > Yes, exactly. 3 to be more accurate since there's also the instance class loader. > > > > > > On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet <bornet.ch...@gmail.com > > > wrote: > > > Dear Pulsar dev community, > > > > I would like to open a discussion here about PIP 193 : Sink preprocessing > > Function <https://github.com/apache/pulsar/issues/16739>. > > > > Best regards > > > > Christophe > > > > ## Motivation > > > > Pulsar IO connectors make it possible to connect Pulsar to an external > > system: > > * A Source reads continuously from an external system and writes to a > > Pulsar topic > > * A Sink reads continuously from a Pulsar topic and writes to an external > > system. > > Sources and Sinks are written in Java. > > > > Pulsar also has a lightweight computing system named Pulsar Functions. A > > Pulsar Function reads from one or more topics, applies user logic written > > in Java, Python or Go and writes to an output topic. > > > > When using Pulsar IO connectors, the format of what is read/written > from/to > > the source/sink is defined by the connector code. But there are a lot of > > situations where a user wants to transform this data before using it. > > Currently the solution is to either : > > * write a custom connector that transforms the data the way we want but > > that means writing a lot of code without reuse, packaging and managing > > custom connectors and so on.. > > * write a Function to transform the data after it was written to a topic > by > > a Source or before it is read from a topic by a Sink. This is not very > > efficient as we have to use an intermediate topic, which means additional > > storage, IO, and latency. > > > > Considering all this, it would be handy to be able to apply a Function > > on-the-fly to a connector without going through an intermediary topic. > > > > ## Goal > > > > This PIP defines the changes needed to be able to apply a preprocessing > > Function on-the-fly to a Sink. > > The preprocessing function can be a built-in function, a package > function, > > or loaded through an http URL or a file path. > > Sources, Sinks and Functions are based on the same runtime process that: > > * reads from a Source. For Sinks and Functions this Source is a > > PulsarSource consuming from a Pulsar topic > > * applies a Function. For Sources and Sinks, this Function is > > IdentityFunction which returns the data it gets without modification. > > * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink > > writing to a Pulsar topic. > > > > This PIP reuses this and allows configuring a Function different from > > IdentityFunction to Sinks. > > Only Functions returning a Record will be authorized to ensure that the > > Function sets the Schema explicitly. > > > > Out of the scope of this PIP, for future work: > > * Applying a post-processing Function to a Source > > * Loading the Function jar through the Sink CLI > > > > ## API Changes > > > > ### Admin CLI > > > > The following options will be added to the `pulsar-admin sinks` CLI > > `create`, `update` and `localrun`: > > * `preprocess-function`: the preprocess function applied before the Sink. > > Starts by `builtin://` for built-in functions, `function://` for package > > function, `http://` or `file://` > > * `preprocess-function-classname`: the preprocess function class name > > (optional if the function is a NAR) > > * `preprocess-function-config`: the configuration of the preprocess > > function in the same format as the `user-config` parameter of the > > `functions create` CLI command. > > > > The corresponding fields will be added to `SinkConfig`: > > > > ```java > > private String preprocessFunction; > > private String preprocessFunctionClassName; > > private String preprocessFunctionConfig; > > ``` > > > > ### Function definition > > > > The field `extraFunctionPackageLocation` to the protobuf structure > > `FunctionMetaData` will be added. This field will be filled with the > > location of the extra function to apply when registering a sink and used > in > > the Runtime to load the function code. > > > > ```protobuf > > message FunctionMetaData { > > ... > > PackageLocationMetaData extraFunctionPackageLocation = 7; > > } > > ``` > > > > ### Runtime > > > > The parameters `extraFunctionFile` and `originalExtraFunctionFileName` > will > > be added to `RuntimeFactory::createContainer` > > > > > > ```java > > Runtime createContainer( > > InstanceConfig instanceConfig, String codeFile, String > > originalCodeFileName, > > String extraFunctionFile, String > originalExtraFunctionFileName, > > Long expectedHealthCheckInterval) throws Exception; > > ``` > > > > ### Instance function cache > > > > A field `extraFunctionId` to `InstanceConfig` that will hold the UUID > cache > > key of the extra function will be added. > > > > ```java > > public class InstanceConfig { > > private int instanceId; > > private String functionId; > > private String extraFunctionId; > > ``` > > > > ### JavaInstanceStarter > > > > > > The following parameters will be added to JavaInstanceStarter: > > * `--extra_function_jar`: the path to the extra function jar > > * `--extra_function_id`: the extra function UUID cache key > > > > These parameters are then used by the `ThreadRuntime` to load the > function > > from the `FunctionCacheManager` or create it there if needed. > > > > ### Download the extra function > > > > The statefulset spawned in `KubernetesRuntime` needs to be able to > download > > the extra functions code via API. > > An `extra-function` query param will be added to the download function > HTTP > > endpoint > > > > ```java > > @Path("/{tenant}/{namespace}/{functionName}/download") > > public StreamingOutput downloadFunction( > > @ApiParam(value = "The tenant of functions") > > final @PathParam("tenant") String tenant, > > @ApiParam(value = "The namespace of functions") > > final @PathParam("namespace") String namespace, > > @ApiParam(value = "The name of functions") > > final @PathParam("functionName") String functionName) { > > final @PathParam("functionName") String functionName, > > @ApiParam(value = "Whether to download the extra-function") > > final @QueryParam("extra-function") boolean extraFunction) { > > ``` > > > > If `extraFunction` is `true` then the extra function will be returned > > instead of the sink. > > > > The Java admin SDK will have the following methods added: > > > > > > ```java > > /** > > * Download Function Code. > > * > > * @param destinationFile > > * file where data should be downloaded to > > * @param tenant > > * Tenant name > > * @param namespace > > * Namespace name > > * @param function > > * Function name > > * @param extraFunction > > * Whether to download the extra-function (for sources and > > sinks) > > * @throws PulsarAdminException > > */ > > void downloadFunction(String destinationFile, String tenant, String > > namespace, String function, > > boolean extraFunction) throws > > PulsarAdminException; > > > > /** > > * Download Function Code asynchronously. > > * > > * @param destinationFile > > * file where data should be downloaded to > > * @param tenant > > * Tenant name > > * @param namespace > > * Namespace name > > * @param function > > * Function name > > * @param extraFunction > > * Whether to download the extra-function (for sources and > > sinks) > > */ > > CompletableFuture<Void> downloadFunctionAsync( > > String destinationFile, String tenant, String namespace, > String > > function, boolean extraFunction); > > ``` > > > > The parameter `--extra-function` will be added to the admin CLI command > > `functions download` > > > > ## Implementation > > > > ### Pulsar-admin > > > > * Add the admin CLI options when creating/updating/localrunning the sink > > (see API changes) > > > > ### Pulsar broker > > > > * On the broker API, in registerSink/updateSink, if a preprocessing > > function is present in the Sink config, we: > > * validate the function > > * get the function classloader (from builtin or download a package > > file) > > * load the function > > * inspect the function types and set the first arg as Sink type. Also > > verify that the second arg is of type Record. > > * use the function classloader instead of the sink classloader to > verify > > if custom schemas, serdes, crypto key readers can be loaded and are > > conform. > > * get the function package location and fill the protobuf > > extraFunctionPackageLocation field with it. A name for this preprocessing > > function is generated from the sink name so it can be referenced when > > stored in BookKeeper or in package management. The name of the > > preprocessing function is `{sink name}__sink-function`. > > * set the `functionDetails` with the preprocessing function config > > (function class name and function userConfig) > > > > * The `--extra-function` query parameter is added to the `functions > > download` CLI command, admin SDK and HTTP API (see API changes). > > > > ### Function worker > > > > * When the `InstanceConfig` is created, an UUID is set to the > > `extraFunctionId` field. This field will serve as a cache key for the > extra > > function (see API changes). > > * When the `FunctionActioner` starts the function, if > > `extraFunctionPackageLocation` is present, the same is done for the extra > > function as what is done for the connector: > > * if the runtime is not externally managed, the extra function code is > > downloaded from the `extraFunctionPackageLocation` and the `Runtime` is > > created with the extra package file path and original name (see API > changes > > to `RuntimeFactory::createContainer`) > > * if the runtime is externally managed, the `Runtime` is created with > the > > `extraFunctionPackageLocation` and original name. > > > > * Depending on the configured runtime, if there’s an extra function file: > > * For the `ThreadRuntime`, the extra function classloader is obtained > > with the instance `extraFunctionId` cache key, then this classloader is > > passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` then > > switches between the connector classloader and the extra function > > classloader accordingly.. > > * For the `ProcessRuntime`, the path to the extra function jar is added > > to the `--extra_function_jar` parameter in the `JavaInstanceStarter` > > command. The `JavaInstanceStarter` then uses it when creating its > > `ThreadRuntime`. > > * For the `KubernetesRuntime`, a command is added in the statefulset > exec > > command to download the extra function using the `–extra-function` flag > of > > the `functions download` command. And the path to this downloaded jar is > > added to the `--extra_function_jar` parameter of the > `JavaInstanceStarter` > > command. > > > > ### LocalRunner > > > > If `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will use > the > > same methods as in the broker to get the function file and > > `functionDetails` and use them when spawning the `Runtime`. > > > > ## Reject Alternatives > > > > N/A > > >