> Can you explain more what you mean ? This PIP doesn't change the API of a Function and it's already possible to write a Function<?, Record<?>>. And when declaring a Sink with a Function we'll check that it's the case.
I mean: we should constrain the function interface, otherwise, the user may return a structure that is not a record. Thanks, Baodi Shi > On Jul 25, 2022, at 01:0233, Christophe Bornet <bornet.ch...@gmail.com> wrote: > > Thanks for the feedback Asaf > > >>> - preprocess-function: the preprocess function applied before the >>> Sink. Starts by builtin:// for built-in functions, function:// for >>> package function, http:// or file:// >>> >>> 1. While this function is applied only before sink? I thought it replaces >> the identity function, so why a source can't have a function that reads >> from the source (say S3), runs the function and only then writes to a >> pulsar topic? >> > > Yes that's totally possible to implement and will be done in future work > like written in the PIP. > > >> 2. Can you clarify more about built in and function for package function? >> Is this an existing functionality ? >> > Yes those are existing functionalities. > Built-in functions are not documented (and we should do something about > that). > Package management of functions is described in > https://pulsar.apache.org/docs/functions-deploy#use-package-management-service > > >> 3. Regarding http - Are you loading a class through that URL? Aren't we >> exposed to same problem Log4Shell security issue had? If so, what measures >> are you taking to protect ? >> > Yes we are loading code via URL. This feature already exists for > Sources/Sinks/Functions. > I guess you need to have a huge trust of the source from where you download. > This PIP has the same security level as what already exists for this > functionality. > > >> >> The field extraFunctionPackageLocation to the protobuf structure >>> FunctionMetaData will be added. This field will be filled with the >>> location of the extra function to apply when registering a sink and used >> in >>> the Runtime to load the function code. >> >> Can you please expand on that? You mean the JAR location, which you will >> search that class name and function specified in the 3 fields you've added >> to the config? >> > Not exactly. It's the location of where the JAR is stored. It can be > BookKeeper, package management, built-in NAR, etc... > In KubernetesRuntime, there are cases where the builtin or package function > you provide in the preprocess-function param could be copied to BK. > That's the same as for a regular Sink/Source and if we need to copy to BK, > we append `__sink-function` to the storage path to prevent conflict with > the sink code. > The class name is indeed looked up in this JAR. > > >> The parameters extraFunctionFile and originalExtraFunctionFileName will be >>> added to RuntimeFactory::createContainer >> >> 1. File and fileName containing what? How does this related to >> extraFunctionPackageLocation? >> > That part mimicks what is already done for the main code of the Source/Sink > code (with respectively codeFile, originalCodeFileName and packageLocation) > Before starting the ThreadedRuntime, we download locally the JAR from the > extraFunctionPackageLocation in the extraFunctionFile so we can load the > code from it. > > >> >> In here you use the terminology Extra Function" and in fields of config and >> admin you used the term Pre-Process Function. I would stick to Pro-Process >> Function and stick with it all over. >> > This terminology need to be applicable to a Function that would be applied > after a Source so we can't use "preprocess" here. > I haven't found better than "extra function". Don't hesitate to propose > something ! > > >> >> >>> The following parameters will be added to JavaInstanceStarter: >>> >>> - --extra_function_jar: the path to the extra function jar >>> >>> >>> - --extra_function_id: the extra function UUID cache key >>> >>> These parameters are then used by the ThreadRuntime to load the function >>> from the FunctionCacheManager or create it there if needed. >> >> >> Can you elaborate on that? JavaInstanceStarter is used to start a single >> Function? It's used from command line? > > The JavaInstanceStarter is indeed a CLI to start a JavaInstance. > The JavaInstance is the process that will execute the code to read from a > Source, execute a Function and write to a Sink. > Generally Pulsar users don't use the JavaInstanceStarter directly. The > command line is forged by the ProcessRuntime and KubernetesRuntime. > >> >> >> In general, you will essentially have two class loaders - one for the sink >> and one for the pre-process function? >> > Yes, exactly. > 3 to be more accurate since there's also the instance class loader. > > >> >> >> >> >> >> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet <bornet.ch...@gmail.com >>> >> wrote: >> >>> Dear Pulsar dev community, >>> >>> I would like to open a discussion here about PIP 193 : Sink preprocessing >>> Function <https://github.com/apache/pulsar/issues/16739>. >>> >>> Best regards >>> >>> Christophe >>> >>> ## Motivation >>> >>> Pulsar IO connectors make it possible to connect Pulsar to an external >>> system: >>> * A Source reads continuously from an external system and writes to a >>> Pulsar topic >>> * A Sink reads continuously from a Pulsar topic and writes to an external >>> system. >>> Sources and Sinks are written in Java. >>> >>> Pulsar also has a lightweight computing system named Pulsar Functions. A >>> Pulsar Function reads from one or more topics, applies user logic written >>> in Java, Python or Go and writes to an output topic. >>> >>> When using Pulsar IO connectors, the format of what is read/written >> from/to >>> the source/sink is defined by the connector code. But there are a lot of >>> situations where a user wants to transform this data before using it. >>> Currently the solution is to either : >>> * write a custom connector that transforms the data the way we want but >>> that means writing a lot of code without reuse, packaging and managing >>> custom connectors and so on.. >>> * write a Function to transform the data after it was written to a topic >> by >>> a Source or before it is read from a topic by a Sink. This is not very >>> efficient as we have to use an intermediate topic, which means additional >>> storage, IO, and latency. >>> >>> Considering all this, it would be handy to be able to apply a Function >>> on-the-fly to a connector without going through an intermediary topic. >>> >>> ## Goal >>> >>> This PIP defines the changes needed to be able to apply a preprocessing >>> Function on-the-fly to a Sink. >>> The preprocessing function can be a built-in function, a package >> function, >>> or loaded through an http URL or a file path. >>> Sources, Sinks and Functions are based on the same runtime process that: >>> * reads from a Source. For Sinks and Functions this Source is a >>> PulsarSource consuming from a Pulsar topic >>> * applies a Function. For Sources and Sinks, this Function is >>> IdentityFunction which returns the data it gets without modification. >>> * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink >>> writing to a Pulsar topic. >>> >>> This PIP reuses this and allows configuring a Function different from >>> IdentityFunction to Sinks. >>> Only Functions returning a Record will be authorized to ensure that the >>> Function sets the Schema explicitly. >>> >>> Out of the scope of this PIP, for future work: >>> * Applying a post-processing Function to a Source >>> * Loading the Function jar through the Sink CLI >>> >>> ## API Changes >>> >>> ### Admin CLI >>> >>> The following options will be added to the `pulsar-admin sinks` CLI >>> `create`, `update` and `localrun`: >>> * `preprocess-function`: the preprocess function applied before the Sink. >>> Starts by `builtin://` for built-in functions, `function://` for package >>> function, `http://` or `file://` >>> * `preprocess-function-classname`: the preprocess function class name >>> (optional if the function is a NAR) >>> * `preprocess-function-config`: the configuration of the preprocess >>> function in the same format as the `user-config` parameter of the >>> `functions create` CLI command. >>> >>> The corresponding fields will be added to `SinkConfig`: >>> >>> ```java >>> private String preprocessFunction; >>> private String preprocessFunctionClassName; >>> private String preprocessFunctionConfig; >>> ``` >>> >>> ### Function definition >>> >>> The field `extraFunctionPackageLocation` to the protobuf structure >>> `FunctionMetaData` will be added. This field will be filled with the >>> location of the extra function to apply when registering a sink and used >> in >>> the Runtime to load the function code. >>> >>> ```protobuf >>> message FunctionMetaData { >>> ... >>> PackageLocationMetaData extraFunctionPackageLocation = 7; >>> } >>> ``` >>> >>> ### Runtime >>> >>> The parameters `extraFunctionFile` and `originalExtraFunctionFileName` >> will >>> be added to `RuntimeFactory::createContainer` >>> >>> >>> ```java >>> Runtime createContainer( >>> InstanceConfig instanceConfig, String codeFile, String >>> originalCodeFileName, >>> String extraFunctionFile, String >> originalExtraFunctionFileName, >>> Long expectedHealthCheckInterval) throws Exception; >>> ``` >>> >>> ### Instance function cache >>> >>> A field `extraFunctionId` to `InstanceConfig` that will hold the UUID >> cache >>> key of the extra function will be added. >>> >>> ```java >>> public class InstanceConfig { >>> private int instanceId; >>> private String functionId; >>> private String extraFunctionId; >>> ``` >>> >>> ### JavaInstanceStarter >>> >>> >>> The following parameters will be added to JavaInstanceStarter: >>> * `--extra_function_jar`: the path to the extra function jar >>> * `--extra_function_id`: the extra function UUID cache key >>> >>> These parameters are then used by the `ThreadRuntime` to load the >> function >>> from the `FunctionCacheManager` or create it there if needed. >>> >>> ### Download the extra function >>> >>> The statefulset spawned in `KubernetesRuntime` needs to be able to >> download >>> the extra functions code via API. >>> An `extra-function` query param will be added to the download function >> HTTP >>> endpoint >>> >>> ```java >>> @Path("/{tenant}/{namespace}/{functionName}/download") >>> public StreamingOutput downloadFunction( >>> @ApiParam(value = "The tenant of functions") >>> final @PathParam("tenant") String tenant, >>> @ApiParam(value = "The namespace of functions") >>> final @PathParam("namespace") String namespace, >>> @ApiParam(value = "The name of functions") >>> final @PathParam("functionName") String functionName) { >>> final @PathParam("functionName") String functionName, >>> @ApiParam(value = "Whether to download the extra-function") >>> final @QueryParam("extra-function") boolean extraFunction) { >>> ``` >>> >>> If `extraFunction` is `true` then the extra function will be returned >>> instead of the sink. >>> >>> The Java admin SDK will have the following methods added: >>> >>> >>> ```java >>> /** >>> * Download Function Code. >>> * >>> * @param destinationFile >>> * file where data should be downloaded to >>> * @param tenant >>> * Tenant name >>> * @param namespace >>> * Namespace name >>> * @param function >>> * Function name >>> * @param extraFunction >>> * Whether to download the extra-function (for sources and >>> sinks) >>> * @throws PulsarAdminException >>> */ >>> void downloadFunction(String destinationFile, String tenant, String >>> namespace, String function, >>> boolean extraFunction) throws >>> PulsarAdminException; >>> >>> /** >>> * Download Function Code asynchronously. >>> * >>> * @param destinationFile >>> * file where data should be downloaded to >>> * @param tenant >>> * Tenant name >>> * @param namespace >>> * Namespace name >>> * @param function >>> * Function name >>> * @param extraFunction >>> * Whether to download the extra-function (for sources and >>> sinks) >>> */ >>> CompletableFuture<Void> downloadFunctionAsync( >>> String destinationFile, String tenant, String namespace, >> String >>> function, boolean extraFunction); >>> ``` >>> >>> The parameter `--extra-function` will be added to the admin CLI command >>> `functions download` >>> >>> ## Implementation >>> >>> ### Pulsar-admin >>> >>> * Add the admin CLI options when creating/updating/localrunning the sink >>> (see API changes) >>> >>> ### Pulsar broker >>> >>> * On the broker API, in registerSink/updateSink, if a preprocessing >>> function is present in the Sink config, we: >>> * validate the function >>> * get the function classloader (from builtin or download a package >>> file) >>> * load the function >>> * inspect the function types and set the first arg as Sink type. Also >>> verify that the second arg is of type Record. >>> * use the function classloader instead of the sink classloader to >> verify >>> if custom schemas, serdes, crypto key readers can be loaded and are >>> conform. >>> * get the function package location and fill the protobuf >>> extraFunctionPackageLocation field with it. A name for this preprocessing >>> function is generated from the sink name so it can be referenced when >>> stored in BookKeeper or in package management. The name of the >>> preprocessing function is `{sink name}__sink-function`. >>> * set the `functionDetails` with the preprocessing function config >>> (function class name and function userConfig) >>> >>> * The `--extra-function` query parameter is added to the `functions >>> download` CLI command, admin SDK and HTTP API (see API changes). >>> >>> ### Function worker >>> >>> * When the `InstanceConfig` is created, an UUID is set to the >>> `extraFunctionId` field. This field will serve as a cache key for the >> extra >>> function (see API changes). >>> * When the `FunctionActioner` starts the function, if >>> `extraFunctionPackageLocation` is present, the same is done for the extra >>> function as what is done for the connector: >>> * if the runtime is not externally managed, the extra function code is >>> downloaded from the `extraFunctionPackageLocation` and the `Runtime` is >>> created with the extra package file path and original name (see API >> changes >>> to `RuntimeFactory::createContainer`) >>> * if the runtime is externally managed, the `Runtime` is created with >> the >>> `extraFunctionPackageLocation` and original name. >>> >>> * Depending on the configured runtime, if there’s an extra function file: >>> * For the `ThreadRuntime`, the extra function classloader is obtained >>> with the instance `extraFunctionId` cache key, then this classloader is >>> passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` then >>> switches between the connector classloader and the extra function >>> classloader accordingly.. >>> * For the `ProcessRuntime`, the path to the extra function jar is added >>> to the `--extra_function_jar` parameter in the `JavaInstanceStarter` >>> command. The `JavaInstanceStarter` then uses it when creating its >>> `ThreadRuntime`. >>> * For the `KubernetesRuntime`, a command is added in the statefulset >> exec >>> command to download the extra function using the `–extra-function` flag >> of >>> the `functions download` command. And the path to this downloaded jar is >>> added to the `--extra_function_jar` parameter of the >> `JavaInstanceStarter` >>> command. >>> >>> ### LocalRunner >>> >>> If `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will use >> the >>> same methods as in the broker to get the function file and >>> `functionDetails` and use them when spawning the `Runtime`. >>> >>> ## Reject Alternatives >>> >>> N/A >>> >>