To my understanding, the Pulsar IO Connectors (i.e. Sources/Sinks) are quite self-contained. They move data around.
If we want to enable functionality described inside the PIP (process -> write to otherplace), can we think in another way -- allow flexible configuring of a Pulsar Function? Originally Pulsar Function pipeline is: PulsarSource -> func() -> PulsarSink() Can we look into allowing users to change a source/sink in the PulsarFunction pipeline instead of tweaking the Sink? Syntax could be: ``` pulsar-admins functions create --sink ... --source ... ``` This will be more flexible and opens a lot possibility for further development On Tue, Jul 26, 2022 at 2:56 AM Christophe Bornet <bornet.ch...@gmail.com> wrote: > Thanks for the feedback Jerry. > We don't modify the way sources, sinks and functions are detected when it's > based on their fields. The proposal is just to modify the classname of the > function applied in the instance so the same detection rules apply. The > only difference is when detecting if the sink or function is built-in. For > this we add some code to do this detection also based on the ComponentType > (either detected or explicit). You can check the implementation PR about > it: https://github.com/apache/pulsar/pull/16740 > > IMO, making it separate implementation of what currently exist would make > things more complex and this more error prone for no good reason. The > proposal is "just" to replace the name of the already existing function > (IdentityFunction) by another one and to provide the location of the > function JAR. > > Best regards > Christophe > > Le lun. 25 juil. 2022 à 23:31, Jerry Peng <jerry.boyang.p...@gmail.com> a > écrit : > > > My feedback is to make this change as self contained as possible. Can we > > just have a special implementation of a sink that will run the logic of > the > > "preprocess" function? There are many places in the code where we figure > > out if it is a source, sink or a function based on the fields in the > > Function metadata. Changing that may have unintended consequences. > > > > On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi <baodi....@icloud.com.invalid> > > wrote: > > > > > > Can you explain more what you mean ? > > > This PIP doesn't change the API of a Function and it's already possible > > to > > > write a Function<?, Record<?>>. > > > And when declaring a Sink with a Function we'll check that it's the > case. > > > > > > I mean: we should constrain the function interface, otherwise, the user > > > may return a structure that is not a record. > > > > > > Thanks, > > > Baodi Shi > > > > > > > On Jul 25, 2022, at 01:0233, Christophe Bornet < > bornet.ch...@gmail.com > > > > > > wrote: > > > > > > > > Thanks for the feedback Asaf > > > > > > > > > > > >>> - preprocess-function: the preprocess function applied before the > > > >>> Sink. Starts by builtin:// for built-in functions, function:// > for > > > >>> package function, http:// or file:// > > > >>> > > > >>> 1. While this function is applied only before sink? I thought it > > > replaces > > > >> the identity function, so why a source can't have a function that > > reads > > > >> from the source (say S3), runs the function and only then writes to > a > > > >> pulsar topic? > > > >> > > > > > > > > Yes that's totally possible to implement and will be done in future > > work > > > > like written in the PIP. > > > > > > > > > > > >> 2. Can you clarify more about built in and function for package > > > function? > > > >> Is this an existing functionality ? > > > >> > > > > Yes those are existing functionalities. > > > > Built-in functions are not documented (and we should do something > about > > > > that). > > > > Package management of functions is described in > > > > > > > > > > https://pulsar.apache.org/docs/functions-deploy#use-package-management-service > > > > > > > > > > > >> 3. Regarding http - Are you loading a class through that URL? Aren't > > we > > > >> exposed to same problem Log4Shell security issue had? If so, what > > > measures > > > >> are you taking to protect ? > > > >> > > > > Yes we are loading code via URL. This feature already exists for > > > > Sources/Sinks/Functions. > > > > I guess you need to have a huge trust of the source from where you > > > download. > > > > This PIP has the same security level as what already exists for this > > > > functionality. > > > > > > > > > > > >> > > > >> The field extraFunctionPackageLocation to the protobuf structure > > > >>> FunctionMetaData will be added. This field will be filled with the > > > >>> location of the extra function to apply when registering a sink and > > > used > > > >> in > > > >>> the Runtime to load the function code. > > > >> > > > >> Can you please expand on that? You mean the JAR location, which you > > will > > > >> search that class name and function specified in the 3 fields you've > > > added > > > >> to the config? > > > >> > > > > Not exactly. It's the location of where the JAR is stored. It can be > > > > BookKeeper, package management, built-in NAR, etc... > > > > In KubernetesRuntime, there are cases where the builtin or package > > > function > > > > you provide in the preprocess-function param could be copied to BK. > > > > That's the same as for a regular Sink/Source and if we need to copy > to > > > BK, > > > > we append `__sink-function` to the storage path to prevent conflict > > with > > > > the sink code. > > > > The class name is indeed looked up in this JAR. > > > > > > > > > > > >> The parameters extraFunctionFile and originalExtraFunctionFileName > > will > > > be > > > >>> added to RuntimeFactory::createContainer > > > >> > > > >> 1. File and fileName containing what? How does this related to > > > >> extraFunctionPackageLocation? > > > >> > > > > That part mimicks what is already done for the main code of the > > > Source/Sink > > > > code (with respectively codeFile, originalCodeFileName and > > > packageLocation) > > > > Before starting the ThreadedRuntime, we download locally the JAR from > > the > > > > extraFunctionPackageLocation in the extraFunctionFile so we can load > > the > > > > code from it. > > > > > > > > > > > >> > > > >> In here you use the terminology Extra Function" and in fields of > > config > > > and > > > >> admin you used the term Pre-Process Function. I would stick to > > > Pro-Process > > > >> Function and stick with it all over. > > > >> > > > > This terminology need to be applicable to a Function that would be > > > applied > > > > after a Source so we can't use "preprocess" here. > > > > I haven't found better than "extra function". Don't hesitate to > propose > > > > something ! > > > > > > > > > > > >> > > > >> > > > >>> The following parameters will be added to JavaInstanceStarter: > > > >>> > > > >>> - --extra_function_jar: the path to the extra function jar > > > >>> > > > >>> > > > >>> - --extra_function_id: the extra function UUID cache key > > > >>> > > > >>> These parameters are then used by the ThreadRuntime to load the > > > function > > > >>> from the FunctionCacheManager or create it there if needed. > > > >> > > > >> > > > >> Can you elaborate on that? JavaInstanceStarter is used to start a > > single > > > >> Function? It's used from command line? > > > > > > > > The JavaInstanceStarter is indeed a CLI to start a JavaInstance. > > > > The JavaInstance is the process that will execute the code to read > > from a > > > > Source, execute a Function and write to a Sink. > > > > Generally Pulsar users don't use the JavaInstanceStarter directly. > The > > > > command line is forged by the ProcessRuntime and KubernetesRuntime. > > > > > > > >> > > > >> > > > >> In general, you will essentially have two class loaders - one for > the > > > sink > > > >> and one for the pre-process function? > > > >> > > > > Yes, exactly. > > > > 3 to be more accurate since there's also the instance class loader. > > > > > > > > > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet < > > > bornet.ch...@gmail.com > > > >>> > > > >> wrote: > > > >> > > > >>> Dear Pulsar dev community, > > > >>> > > > >>> I would like to open a discussion here about PIP 193 : Sink > > > preprocessing > > > >>> Function <https://github.com/apache/pulsar/issues/16739>. > > > >>> > > > >>> Best regards > > > >>> > > > >>> Christophe > > > >>> > > > >>> ## Motivation > > > >>> > > > >>> Pulsar IO connectors make it possible to connect Pulsar to an > > external > > > >>> system: > > > >>> * A Source reads continuously from an external system and writes > to a > > > >>> Pulsar topic > > > >>> * A Sink reads continuously from a Pulsar topic and writes to an > > > external > > > >>> system. > > > >>> Sources and Sinks are written in Java. > > > >>> > > > >>> Pulsar also has a lightweight computing system named Pulsar > > Functions. > > > A > > > >>> Pulsar Function reads from one or more topics, applies user logic > > > written > > > >>> in Java, Python or Go and writes to an output topic. > > > >>> > > > >>> When using Pulsar IO connectors, the format of what is read/written > > > >> from/to > > > >>> the source/sink is defined by the connector code. But there are a > lot > > > of > > > >>> situations where a user wants to transform this data before using > it. > > > >>> Currently the solution is to either : > > > >>> * write a custom connector that transforms the data the way we want > > but > > > >>> that means writing a lot of code without reuse, packaging and > > managing > > > >>> custom connectors and so on.. > > > >>> * write a Function to transform the data after it was written to a > > > topic > > > >> by > > > >>> a Source or before it is read from a topic by a Sink. This is not > > very > > > >>> efficient as we have to use an intermediate topic, which means > > > additional > > > >>> storage, IO, and latency. > > > >>> > > > >>> Considering all this, it would be handy to be able to apply a > > Function > > > >>> on-the-fly to a connector without going through an intermediary > > topic. > > > >>> > > > >>> ## Goal > > > >>> > > > >>> This PIP defines the changes needed to be able to apply a > > preprocessing > > > >>> Function on-the-fly to a Sink. > > > >>> The preprocessing function can be a built-in function, a package > > > >> function, > > > >>> or loaded through an http URL or a file path. > > > >>> Sources, Sinks and Functions are based on the same runtime process > > > that: > > > >>> * reads from a Source. For Sinks and Functions this Source is a > > > >>> PulsarSource consuming from a Pulsar topic > > > >>> * applies a Function. For Sources and Sinks, this Function is > > > >>> IdentityFunction which returns the data it gets without > modification. > > > >>> * writes to a Sink. For Sources and Functions, this Sink is a > > > PulsarSink > > > >>> writing to a Pulsar topic. > > > >>> > > > >>> This PIP reuses this and allows configuring a Function different > from > > > >>> IdentityFunction to Sinks. > > > >>> Only Functions returning a Record will be authorized to ensure that > > the > > > >>> Function sets the Schema explicitly. > > > >>> > > > >>> Out of the scope of this PIP, for future work: > > > >>> * Applying a post-processing Function to a Source > > > >>> * Loading the Function jar through the Sink CLI > > > >>> > > > >>> ## API Changes > > > >>> > > > >>> ### Admin CLI > > > >>> > > > >>> The following options will be added to the `pulsar-admin sinks` CLI > > > >>> `create`, `update` and `localrun`: > > > >>> * `preprocess-function`: the preprocess function applied before the > > > Sink. > > > >>> Starts by `builtin://` for built-in functions, `function://` for > > > package > > > >>> function, `http://` or `file://` > > > >>> * `preprocess-function-classname`: the preprocess function class > name > > > >>> (optional if the function is a NAR) > > > >>> * `preprocess-function-config`: the configuration of the preprocess > > > >>> function in the same format as the `user-config` parameter of the > > > >>> `functions create` CLI command. > > > >>> > > > >>> The corresponding fields will be added to `SinkConfig`: > > > >>> > > > >>> ```java > > > >>> private String preprocessFunction; > > > >>> private String preprocessFunctionClassName; > > > >>> private String preprocessFunctionConfig; > > > >>> ``` > > > >>> > > > >>> ### Function definition > > > >>> > > > >>> The field `extraFunctionPackageLocation` to the protobuf structure > > > >>> `FunctionMetaData` will be added. This field will be filled with > the > > > >>> location of the extra function to apply when registering a sink and > > > used > > > >> in > > > >>> the Runtime to load the function code. > > > >>> > > > >>> ```protobuf > > > >>> message FunctionMetaData { > > > >>> ... > > > >>> PackageLocationMetaData extraFunctionPackageLocation = 7; > > > >>> } > > > >>> ``` > > > >>> > > > >>> ### Runtime > > > >>> > > > >>> The parameters `extraFunctionFile` and > > `originalExtraFunctionFileName` > > > >> will > > > >>> be added to `RuntimeFactory::createContainer` > > > >>> > > > >>> > > > >>> ```java > > > >>> Runtime createContainer( > > > >>> InstanceConfig instanceConfig, String codeFile, String > > > >>> originalCodeFileName, > > > >>> String extraFunctionFile, String > > > >> originalExtraFunctionFileName, > > > >>> Long expectedHealthCheckInterval) throws Exception; > > > >>> ``` > > > >>> > > > >>> ### Instance function cache > > > >>> > > > >>> A field `extraFunctionId` to `InstanceConfig` that will hold the > UUID > > > >> cache > > > >>> key of the extra function will be added. > > > >>> > > > >>> ```java > > > >>> public class InstanceConfig { > > > >>> private int instanceId; > > > >>> private String functionId; > > > >>> private String extraFunctionId; > > > >>> ``` > > > >>> > > > >>> ### JavaInstanceStarter > > > >>> > > > >>> > > > >>> The following parameters will be added to JavaInstanceStarter: > > > >>> * `--extra_function_jar`: the path to the extra function jar > > > >>> * `--extra_function_id`: the extra function UUID cache key > > > >>> > > > >>> These parameters are then used by the `ThreadRuntime` to load the > > > >> function > > > >>> from the `FunctionCacheManager` or create it there if needed. > > > >>> > > > >>> ### Download the extra function > > > >>> > > > >>> The statefulset spawned in `KubernetesRuntime` needs to be able to > > > >> download > > > >>> the extra functions code via API. > > > >>> An `extra-function` query param will be added to the download > > function > > > >> HTTP > > > >>> endpoint > > > >>> > > > >>> ```java > > > >>> @Path("/{tenant}/{namespace}/{functionName}/download") > > > >>> public StreamingOutput downloadFunction( > > > >>> @ApiParam(value = "The tenant of functions") > > > >>> final @PathParam("tenant") String tenant, > > > >>> @ApiParam(value = "The namespace of functions") > > > >>> final @PathParam("namespace") String namespace, > > > >>> @ApiParam(value = "The name of functions") > > > >>> final @PathParam("functionName") String functionName) { > > > >>> final @PathParam("functionName") String functionName, > > > >>> @ApiParam(value = "Whether to download the > > extra-function") > > > >>> final @QueryParam("extra-function") boolean > > extraFunction) { > > > >>> ``` > > > >>> > > > >>> If `extraFunction` is `true` then the extra function will be > returned > > > >>> instead of the sink. > > > >>> > > > >>> The Java admin SDK will have the following methods added: > > > >>> > > > >>> > > > >>> ```java > > > >>> /** > > > >>> * Download Function Code. > > > >>> * > > > >>> * @param destinationFile > > > >>> * file where data should be downloaded to > > > >>> * @param tenant > > > >>> * Tenant name > > > >>> * @param namespace > > > >>> * Namespace name > > > >>> * @param function > > > >>> * Function name > > > >>> * @param extraFunction > > > >>> * Whether to download the extra-function (for > sources > > > and > > > >>> sinks) > > > >>> * @throws PulsarAdminException > > > >>> */ > > > >>> void downloadFunction(String destinationFile, String tenant, > > String > > > >>> namespace, String function, > > > >>> boolean extraFunction) throws > > > >>> PulsarAdminException; > > > >>> > > > >>> /** > > > >>> * Download Function Code asynchronously. > > > >>> * > > > >>> * @param destinationFile > > > >>> * file where data should be downloaded to > > > >>> * @param tenant > > > >>> * Tenant name > > > >>> * @param namespace > > > >>> * Namespace name > > > >>> * @param function > > > >>> * Function name > > > >>> * @param extraFunction > > > >>> * Whether to download the extra-function (for > sources > > > and > > > >>> sinks) > > > >>> */ > > > >>> CompletableFuture<Void> downloadFunctionAsync( > > > >>> String destinationFile, String tenant, String namespace, > > > >> String > > > >>> function, boolean extraFunction); > > > >>> ``` > > > >>> > > > >>> The parameter `--extra-function` will be added to the admin CLI > > command > > > >>> `functions download` > > > >>> > > > >>> ## Implementation > > > >>> > > > >>> ### Pulsar-admin > > > >>> > > > >>> * Add the admin CLI options when creating/updating/localrunning the > > > sink > > > >>> (see API changes) > > > >>> > > > >>> ### Pulsar broker > > > >>> > > > >>> * On the broker API, in registerSink/updateSink, if a preprocessing > > > >>> function is present in the Sink config, we: > > > >>> * validate the function > > > >>> * get the function classloader (from builtin or download a > package > > > >>> file) > > > >>> * load the function > > > >>> * inspect the function types and set the first arg as Sink type. > > > Also > > > >>> verify that the second arg is of type Record. > > > >>> * use the function classloader instead of the sink classloader to > > > >> verify > > > >>> if custom schemas, serdes, crypto key readers can be loaded and are > > > >>> conform. > > > >>> * get the function package location and fill the protobuf > > > >>> extraFunctionPackageLocation field with it. A name for this > > > preprocessing > > > >>> function is generated from the sink name so it can be referenced > when > > > >>> stored in BookKeeper or in package management. The name of the > > > >>> preprocessing function is `{sink name}__sink-function`. > > > >>> * set the `functionDetails` with the preprocessing function config > > > >>> (function class name and function userConfig) > > > >>> > > > >>> * The `--extra-function` query parameter is added to the `functions > > > >>> download` CLI command, admin SDK and HTTP API (see API changes). > > > >>> > > > >>> ### Function worker > > > >>> > > > >>> * When the `InstanceConfig` is created, an UUID is set to the > > > >>> `extraFunctionId` field. This field will serve as a cache key for > the > > > >> extra > > > >>> function (see API changes). > > > >>> * When the `FunctionActioner` starts the function, if > > > >>> `extraFunctionPackageLocation` is present, the same is done for the > > > extra > > > >>> function as what is done for the connector: > > > >>> * if the runtime is not externally managed, the extra function > code > > is > > > >>> downloaded from the `extraFunctionPackageLocation` and the > `Runtime` > > is > > > >>> created with the extra package file path and original name (see API > > > >> changes > > > >>> to `RuntimeFactory::createContainer`) > > > >>> * if the runtime is externally managed, the `Runtime` is created > > with > > > >> the > > > >>> `extraFunctionPackageLocation` and original name. > > > >>> > > > >>> * Depending on the configured runtime, if there’s an extra function > > > file: > > > >>> * For the `ThreadRuntime`, the extra function classloader is > > obtained > > > >>> with the instance `extraFunctionId` cache key, then this > classloader > > is > > > >>> passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` > then > > > >>> switches between the connector classloader and the extra function > > > >>> classloader accordingly.. > > > >>> * For the `ProcessRuntime`, the path to the extra function jar is > > > added > > > >>> to the `--extra_function_jar` parameter in the > `JavaInstanceStarter` > > > >>> command. The `JavaInstanceStarter` then uses it when creating its > > > >>> `ThreadRuntime`. > > > >>> * For the `KubernetesRuntime`, a command is added in the > statefulset > > > >> exec > > > >>> command to download the extra function using the `–extra-function` > > flag > > > >> of > > > >>> the `functions download` command. And the path to this downloaded > > jar > > > is > > > >>> added to the `--extra_function_jar` parameter of the > > > >> `JavaInstanceStarter` > > > >>> command. > > > >>> > > > >>> ### LocalRunner > > > >>> > > > >>> If `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will > > use > > > >> the > > > >>> same methods as in the broker to get the function file and > > > >>> `functionDetails` and use them when spawning the `Runtime`. > > > >>> > > > >>> ## Reject Alternatives > > > >>> > > > >>> N/A > > > >>> > > > >> > > > > > > > > > -- Best Regards, Neng