My feedback is to make this change as self contained as possible. Can we just have a special implementation of a sink that will run the logic of the "preprocess" function? There are many places in the code where we figure out if it is a source, sink or a function based on the fields in the Function metadata. Changing that may have unintended consequences.
On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi <baodi....@icloud.com.invalid> wrote: > > Can you explain more what you mean ? > This PIP doesn't change the API of a Function and it's already possible to > write a Function<?, Record<?>>. > And when declaring a Sink with a Function we'll check that it's the case. > > I mean: we should constrain the function interface, otherwise, the user > may return a structure that is not a record. > > Thanks, > Baodi Shi > > > On Jul 25, 2022, at 01:0233, Christophe Bornet <bornet.ch...@gmail.com> > wrote: > > > > Thanks for the feedback Asaf > > > > > >>> - preprocess-function: the preprocess function applied before the > >>> Sink. Starts by builtin:// for built-in functions, function:// for > >>> package function, http:// or file:// > >>> > >>> 1. While this function is applied only before sink? I thought it > replaces > >> the identity function, so why a source can't have a function that reads > >> from the source (say S3), runs the function and only then writes to a > >> pulsar topic? > >> > > > > Yes that's totally possible to implement and will be done in future work > > like written in the PIP. > > > > > >> 2. Can you clarify more about built in and function for package > function? > >> Is this an existing functionality ? > >> > > Yes those are existing functionalities. > > Built-in functions are not documented (and we should do something about > > that). > > Package management of functions is described in > > > https://pulsar.apache.org/docs/functions-deploy#use-package-management-service > > > > > >> 3. Regarding http - Are you loading a class through that URL? Aren't we > >> exposed to same problem Log4Shell security issue had? If so, what > measures > >> are you taking to protect ? > >> > > Yes we are loading code via URL. This feature already exists for > > Sources/Sinks/Functions. > > I guess you need to have a huge trust of the source from where you > download. > > This PIP has the same security level as what already exists for this > > functionality. > > > > > >> > >> The field extraFunctionPackageLocation to the protobuf structure > >>> FunctionMetaData will be added. This field will be filled with the > >>> location of the extra function to apply when registering a sink and > used > >> in > >>> the Runtime to load the function code. > >> > >> Can you please expand on that? You mean the JAR location, which you will > >> search that class name and function specified in the 3 fields you've > added > >> to the config? > >> > > Not exactly. It's the location of where the JAR is stored. It can be > > BookKeeper, package management, built-in NAR, etc... > > In KubernetesRuntime, there are cases where the builtin or package > function > > you provide in the preprocess-function param could be copied to BK. > > That's the same as for a regular Sink/Source and if we need to copy to > BK, > > we append `__sink-function` to the storage path to prevent conflict with > > the sink code. > > The class name is indeed looked up in this JAR. > > > > > >> The parameters extraFunctionFile and originalExtraFunctionFileName will > be > >>> added to RuntimeFactory::createContainer > >> > >> 1. File and fileName containing what? How does this related to > >> extraFunctionPackageLocation? > >> > > That part mimicks what is already done for the main code of the > Source/Sink > > code (with respectively codeFile, originalCodeFileName and > packageLocation) > > Before starting the ThreadedRuntime, we download locally the JAR from the > > extraFunctionPackageLocation in the extraFunctionFile so we can load the > > code from it. > > > > > >> > >> In here you use the terminology Extra Function" and in fields of config > and > >> admin you used the term Pre-Process Function. I would stick to > Pro-Process > >> Function and stick with it all over. > >> > > This terminology need to be applicable to a Function that would be > applied > > after a Source so we can't use "preprocess" here. > > I haven't found better than "extra function". Don't hesitate to propose > > something ! > > > > > >> > >> > >>> The following parameters will be added to JavaInstanceStarter: > >>> > >>> - --extra_function_jar: the path to the extra function jar > >>> > >>> > >>> - --extra_function_id: the extra function UUID cache key > >>> > >>> These parameters are then used by the ThreadRuntime to load the > function > >>> from the FunctionCacheManager or create it there if needed. > >> > >> > >> Can you elaborate on that? JavaInstanceStarter is used to start a single > >> Function? It's used from command line? > > > > The JavaInstanceStarter is indeed a CLI to start a JavaInstance. > > The JavaInstance is the process that will execute the code to read from a > > Source, execute a Function and write to a Sink. > > Generally Pulsar users don't use the JavaInstanceStarter directly. The > > command line is forged by the ProcessRuntime and KubernetesRuntime. > > > >> > >> > >> In general, you will essentially have two class loaders - one for the > sink > >> and one for the pre-process function? > >> > > Yes, exactly. > > 3 to be more accurate since there's also the instance class loader. > > > > > >> > >> > >> > >> > >> > >> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet < > bornet.ch...@gmail.com > >>> > >> wrote: > >> > >>> Dear Pulsar dev community, > >>> > >>> I would like to open a discussion here about PIP 193 : Sink > preprocessing > >>> Function <https://github.com/apache/pulsar/issues/16739>. > >>> > >>> Best regards > >>> > >>> Christophe > >>> > >>> ## Motivation > >>> > >>> Pulsar IO connectors make it possible to connect Pulsar to an external > >>> system: > >>> * A Source reads continuously from an external system and writes to a > >>> Pulsar topic > >>> * A Sink reads continuously from a Pulsar topic and writes to an > external > >>> system. > >>> Sources and Sinks are written in Java. > >>> > >>> Pulsar also has a lightweight computing system named Pulsar Functions. > A > >>> Pulsar Function reads from one or more topics, applies user logic > written > >>> in Java, Python or Go and writes to an output topic. > >>> > >>> When using Pulsar IO connectors, the format of what is read/written > >> from/to > >>> the source/sink is defined by the connector code. But there are a lot > of > >>> situations where a user wants to transform this data before using it. > >>> Currently the solution is to either : > >>> * write a custom connector that transforms the data the way we want but > >>> that means writing a lot of code without reuse, packaging and managing > >>> custom connectors and so on.. > >>> * write a Function to transform the data after it was written to a > topic > >> by > >>> a Source or before it is read from a topic by a Sink. This is not very > >>> efficient as we have to use an intermediate topic, which means > additional > >>> storage, IO, and latency. > >>> > >>> Considering all this, it would be handy to be able to apply a Function > >>> on-the-fly to a connector without going through an intermediary topic. > >>> > >>> ## Goal > >>> > >>> This PIP defines the changes needed to be able to apply a preprocessing > >>> Function on-the-fly to a Sink. > >>> The preprocessing function can be a built-in function, a package > >> function, > >>> or loaded through an http URL or a file path. > >>> Sources, Sinks and Functions are based on the same runtime process > that: > >>> * reads from a Source. For Sinks and Functions this Source is a > >>> PulsarSource consuming from a Pulsar topic > >>> * applies a Function. For Sources and Sinks, this Function is > >>> IdentityFunction which returns the data it gets without modification. > >>> * writes to a Sink. For Sources and Functions, this Sink is a > PulsarSink > >>> writing to a Pulsar topic. > >>> > >>> This PIP reuses this and allows configuring a Function different from > >>> IdentityFunction to Sinks. > >>> Only Functions returning a Record will be authorized to ensure that the > >>> Function sets the Schema explicitly. > >>> > >>> Out of the scope of this PIP, for future work: > >>> * Applying a post-processing Function to a Source > >>> * Loading the Function jar through the Sink CLI > >>> > >>> ## API Changes > >>> > >>> ### Admin CLI > >>> > >>> The following options will be added to the `pulsar-admin sinks` CLI > >>> `create`, `update` and `localrun`: > >>> * `preprocess-function`: the preprocess function applied before the > Sink. > >>> Starts by `builtin://` for built-in functions, `function://` for > package > >>> function, `http://` or `file://` > >>> * `preprocess-function-classname`: the preprocess function class name > >>> (optional if the function is a NAR) > >>> * `preprocess-function-config`: the configuration of the preprocess > >>> function in the same format as the `user-config` parameter of the > >>> `functions create` CLI command. > >>> > >>> The corresponding fields will be added to `SinkConfig`: > >>> > >>> ```java > >>> private String preprocessFunction; > >>> private String preprocessFunctionClassName; > >>> private String preprocessFunctionConfig; > >>> ``` > >>> > >>> ### Function definition > >>> > >>> The field `extraFunctionPackageLocation` to the protobuf structure > >>> `FunctionMetaData` will be added. This field will be filled with the > >>> location of the extra function to apply when registering a sink and > used > >> in > >>> the Runtime to load the function code. > >>> > >>> ```protobuf > >>> message FunctionMetaData { > >>> ... > >>> PackageLocationMetaData extraFunctionPackageLocation = 7; > >>> } > >>> ``` > >>> > >>> ### Runtime > >>> > >>> The parameters `extraFunctionFile` and `originalExtraFunctionFileName` > >> will > >>> be added to `RuntimeFactory::createContainer` > >>> > >>> > >>> ```java > >>> Runtime createContainer( > >>> InstanceConfig instanceConfig, String codeFile, String > >>> originalCodeFileName, > >>> String extraFunctionFile, String > >> originalExtraFunctionFileName, > >>> Long expectedHealthCheckInterval) throws Exception; > >>> ``` > >>> > >>> ### Instance function cache > >>> > >>> A field `extraFunctionId` to `InstanceConfig` that will hold the UUID > >> cache > >>> key of the extra function will be added. > >>> > >>> ```java > >>> public class InstanceConfig { > >>> private int instanceId; > >>> private String functionId; > >>> private String extraFunctionId; > >>> ``` > >>> > >>> ### JavaInstanceStarter > >>> > >>> > >>> The following parameters will be added to JavaInstanceStarter: > >>> * `--extra_function_jar`: the path to the extra function jar > >>> * `--extra_function_id`: the extra function UUID cache key > >>> > >>> These parameters are then used by the `ThreadRuntime` to load the > >> function > >>> from the `FunctionCacheManager` or create it there if needed. > >>> > >>> ### Download the extra function > >>> > >>> The statefulset spawned in `KubernetesRuntime` needs to be able to > >> download > >>> the extra functions code via API. > >>> An `extra-function` query param will be added to the download function > >> HTTP > >>> endpoint > >>> > >>> ```java > >>> @Path("/{tenant}/{namespace}/{functionName}/download") > >>> public StreamingOutput downloadFunction( > >>> @ApiParam(value = "The tenant of functions") > >>> final @PathParam("tenant") String tenant, > >>> @ApiParam(value = "The namespace of functions") > >>> final @PathParam("namespace") String namespace, > >>> @ApiParam(value = "The name of functions") > >>> final @PathParam("functionName") String functionName) { > >>> final @PathParam("functionName") String functionName, > >>> @ApiParam(value = "Whether to download the extra-function") > >>> final @QueryParam("extra-function") boolean extraFunction) { > >>> ``` > >>> > >>> If `extraFunction` is `true` then the extra function will be returned > >>> instead of the sink. > >>> > >>> The Java admin SDK will have the following methods added: > >>> > >>> > >>> ```java > >>> /** > >>> * Download Function Code. > >>> * > >>> * @param destinationFile > >>> * file where data should be downloaded to > >>> * @param tenant > >>> * Tenant name > >>> * @param namespace > >>> * Namespace name > >>> * @param function > >>> * Function name > >>> * @param extraFunction > >>> * Whether to download the extra-function (for sources > and > >>> sinks) > >>> * @throws PulsarAdminException > >>> */ > >>> void downloadFunction(String destinationFile, String tenant, String > >>> namespace, String function, > >>> boolean extraFunction) throws > >>> PulsarAdminException; > >>> > >>> /** > >>> * Download Function Code asynchronously. > >>> * > >>> * @param destinationFile > >>> * file where data should be downloaded to > >>> * @param tenant > >>> * Tenant name > >>> * @param namespace > >>> * Namespace name > >>> * @param function > >>> * Function name > >>> * @param extraFunction > >>> * Whether to download the extra-function (for sources > and > >>> sinks) > >>> */ > >>> CompletableFuture<Void> downloadFunctionAsync( > >>> String destinationFile, String tenant, String namespace, > >> String > >>> function, boolean extraFunction); > >>> ``` > >>> > >>> The parameter `--extra-function` will be added to the admin CLI command > >>> `functions download` > >>> > >>> ## Implementation > >>> > >>> ### Pulsar-admin > >>> > >>> * Add the admin CLI options when creating/updating/localrunning the > sink > >>> (see API changes) > >>> > >>> ### Pulsar broker > >>> > >>> * On the broker API, in registerSink/updateSink, if a preprocessing > >>> function is present in the Sink config, we: > >>> * validate the function > >>> * get the function classloader (from builtin or download a package > >>> file) > >>> * load the function > >>> * inspect the function types and set the first arg as Sink type. > Also > >>> verify that the second arg is of type Record. > >>> * use the function classloader instead of the sink classloader to > >> verify > >>> if custom schemas, serdes, crypto key readers can be loaded and are > >>> conform. > >>> * get the function package location and fill the protobuf > >>> extraFunctionPackageLocation field with it. A name for this > preprocessing > >>> function is generated from the sink name so it can be referenced when > >>> stored in BookKeeper or in package management. The name of the > >>> preprocessing function is `{sink name}__sink-function`. > >>> * set the `functionDetails` with the preprocessing function config > >>> (function class name and function userConfig) > >>> > >>> * The `--extra-function` query parameter is added to the `functions > >>> download` CLI command, admin SDK and HTTP API (see API changes). > >>> > >>> ### Function worker > >>> > >>> * When the `InstanceConfig` is created, an UUID is set to the > >>> `extraFunctionId` field. This field will serve as a cache key for the > >> extra > >>> function (see API changes). > >>> * When the `FunctionActioner` starts the function, if > >>> `extraFunctionPackageLocation` is present, the same is done for the > extra > >>> function as what is done for the connector: > >>> * if the runtime is not externally managed, the extra function code is > >>> downloaded from the `extraFunctionPackageLocation` and the `Runtime` is > >>> created with the extra package file path and original name (see API > >> changes > >>> to `RuntimeFactory::createContainer`) > >>> * if the runtime is externally managed, the `Runtime` is created with > >> the > >>> `extraFunctionPackageLocation` and original name. > >>> > >>> * Depending on the configured runtime, if there’s an extra function > file: > >>> * For the `ThreadRuntime`, the extra function classloader is obtained > >>> with the instance `extraFunctionId` cache key, then this classloader is > >>> passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` then > >>> switches between the connector classloader and the extra function > >>> classloader accordingly.. > >>> * For the `ProcessRuntime`, the path to the extra function jar is > added > >>> to the `--extra_function_jar` parameter in the `JavaInstanceStarter` > >>> command. The `JavaInstanceStarter` then uses it when creating its > >>> `ThreadRuntime`. > >>> * For the `KubernetesRuntime`, a command is added in the statefulset > >> exec > >>> command to download the extra function using the `–extra-function` flag > >> of > >>> the `functions download` command. And the path to this downloaded jar > is > >>> added to the `--extra_function_jar` parameter of the > >> `JavaInstanceStarter` > >>> command. > >>> > >>> ### LocalRunner > >>> > >>> If `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will use > >> the > >>> same methods as in the broker to get the function file and > >>> `functionDetails` and use them when spawning the `Runtime`. > >>> > >>> ## Reject Alternatives > >>> > >>> N/A > >>> > >> > >