This may be a matter of personal opinion but I don't see connectors as more self contained than functions. The problem I see in looking at this from a Function point of view is that a Function could have both its Sink and Source not connected to Pulsar which feels awkward. I understand that functions were not designed to be generic purpose functions but only to deal with Pulsar messages.
Le mar. 9 août 2022 à 19:04, Neng Lu <freen...@gmail.com> a écrit : > To my understanding, the Pulsar IO Connectors (i.e. Sources/Sinks) are > quite self-contained. They move data around. > > If we want to enable functionality described inside the PIP (process -> > write to otherplace), can we think in another way -- allow flexible > configuring of a Pulsar Function? > > Originally Pulsar Function pipeline is: > PulsarSource -> func() -> PulsarSink() > > Can we look into allowing users to change a source/sink in the > PulsarFunction pipeline instead of tweaking the Sink? > > Syntax could be: > ``` > pulsar-admins functions create --sink ... --source ... > ``` > > This will be more flexible and opens a lot possibility for further > development > > > > On Tue, Jul 26, 2022 at 2:56 AM Christophe Bornet <bornet.ch...@gmail.com> > wrote: > > > Thanks for the feedback Jerry. > > We don't modify the way sources, sinks and functions are detected when > it's > > based on their fields. The proposal is just to modify the classname of > the > > function applied in the instance so the same detection rules apply. The > > only difference is when detecting if the sink or function is built-in. > For > > this we add some code to do this detection also based on the > ComponentType > > (either detected or explicit). You can check the implementation PR about > > it: https://github.com/apache/pulsar/pull/16740 > > > > IMO, making it separate implementation of what currently exist would make > > things more complex and this more error prone for no good reason. The > > proposal is "just" to replace the name of the already existing function > > (IdentityFunction) by another one and to provide the location of the > > function JAR. > > > > Best regards > > Christophe > > > > Le lun. 25 juil. 2022 à 23:31, Jerry Peng <jerry.boyang.p...@gmail.com> > a > > écrit : > > > > > My feedback is to make this change as self contained as possible. Can > we > > > just have a special implementation of a sink that will run the logic of > > the > > > "preprocess" function? There are many places in the code where we > figure > > > out if it is a source, sink or a function based on the fields in the > > > Function metadata. Changing that may have unintended consequences. > > > > > > On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi <baodi....@icloud.com.invalid > > > > > wrote: > > > > > > > > Can you explain more what you mean ? > > > > This PIP doesn't change the API of a Function and it's already > possible > > > to > > > > write a Function<?, Record<?>>. > > > > And when declaring a Sink with a Function we'll check that it's the > > case. > > > > > > > > I mean: we should constrain the function interface, otherwise, the > user > > > > may return a structure that is not a record. > > > > > > > > Thanks, > > > > Baodi Shi > > > > > > > > > On Jul 25, 2022, at 01:0233, Christophe Bornet < > > bornet.ch...@gmail.com > > > > > > > > wrote: > > > > > > > > > > Thanks for the feedback Asaf > > > > > > > > > > > > > > >>> - preprocess-function: the preprocess function applied before > the > > > > >>> Sink. Starts by builtin:// for built-in functions, function:// > > for > > > > >>> package function, http:// or file:// > > > > >>> > > > > >>> 1. While this function is applied only before sink? I thought it > > > > replaces > > > > >> the identity function, so why a source can't have a function that > > > reads > > > > >> from the source (say S3), runs the function and only then writes > to > > a > > > > >> pulsar topic? > > > > >> > > > > > > > > > > Yes that's totally possible to implement and will be done in future > > > work > > > > > like written in the PIP. > > > > > > > > > > > > > > >> 2. Can you clarify more about built in and function for package > > > > function? > > > > >> Is this an existing functionality ? > > > > >> > > > > > Yes those are existing functionalities. > > > > > Built-in functions are not documented (and we should do something > > about > > > > > that). > > > > > Package management of functions is described in > > > > > > > > > > > > > > > https://pulsar.apache.org/docs/functions-deploy#use-package-management-service > > > > > > > > > > > > > > >> 3. Regarding http - Are you loading a class through that URL? > Aren't > > > we > > > > >> exposed to same problem Log4Shell security issue had? If so, what > > > > measures > > > > >> are you taking to protect ? > > > > >> > > > > > Yes we are loading code via URL. This feature already exists for > > > > > Sources/Sinks/Functions. > > > > > I guess you need to have a huge trust of the source from where you > > > > download. > > > > > This PIP has the same security level as what already exists for > this > > > > > functionality. > > > > > > > > > > > > > > >> > > > > >> The field extraFunctionPackageLocation to the protobuf structure > > > > >>> FunctionMetaData will be added. This field will be filled with > the > > > > >>> location of the extra function to apply when registering a sink > and > > > > used > > > > >> in > > > > >>> the Runtime to load the function code. > > > > >> > > > > >> Can you please expand on that? You mean the JAR location, which > you > > > will > > > > >> search that class name and function specified in the 3 fields > you've > > > > added > > > > >> to the config? > > > > >> > > > > > Not exactly. It's the location of where the JAR is stored. It can > be > > > > > BookKeeper, package management, built-in NAR, etc... > > > > > In KubernetesRuntime, there are cases where the builtin or package > > > > function > > > > > you provide in the preprocess-function param could be copied to BK. > > > > > That's the same as for a regular Sink/Source and if we need to copy > > to > > > > BK, > > > > > we append `__sink-function` to the storage path to prevent conflict > > > with > > > > > the sink code. > > > > > The class name is indeed looked up in this JAR. > > > > > > > > > > > > > > >> The parameters extraFunctionFile and originalExtraFunctionFileName > > > will > > > > be > > > > >>> added to RuntimeFactory::createContainer > > > > >> > > > > >> 1. File and fileName containing what? How does this related to > > > > >> extraFunctionPackageLocation? > > > > >> > > > > > That part mimicks what is already done for the main code of the > > > > Source/Sink > > > > > code (with respectively codeFile, originalCodeFileName and > > > > packageLocation) > > > > > Before starting the ThreadedRuntime, we download locally the JAR > from > > > the > > > > > extraFunctionPackageLocation in the extraFunctionFile so we can > load > > > the > > > > > code from it. > > > > > > > > > > > > > > >> > > > > >> In here you use the terminology Extra Function" and in fields of > > > config > > > > and > > > > >> admin you used the term Pre-Process Function. I would stick to > > > > Pro-Process > > > > >> Function and stick with it all over. > > > > >> > > > > > This terminology need to be applicable to a Function that would be > > > > applied > > > > > after a Source so we can't use "preprocess" here. > > > > > I haven't found better than "extra function". Don't hesitate to > > propose > > > > > something ! > > > > > > > > > > > > > > >> > > > > >> > > > > >>> The following parameters will be added to JavaInstanceStarter: > > > > >>> > > > > >>> - --extra_function_jar: the path to the extra function jar > > > > >>> > > > > >>> > > > > >>> - --extra_function_id: the extra function UUID cache key > > > > >>> > > > > >>> These parameters are then used by the ThreadRuntime to load the > > > > function > > > > >>> from the FunctionCacheManager or create it there if needed. > > > > >> > > > > >> > > > > >> Can you elaborate on that? JavaInstanceStarter is used to start a > > > single > > > > >> Function? It's used from command line? > > > > > > > > > > The JavaInstanceStarter is indeed a CLI to start a JavaInstance. > > > > > The JavaInstance is the process that will execute the code to read > > > from a > > > > > Source, execute a Function and write to a Sink. > > > > > Generally Pulsar users don't use the JavaInstanceStarter directly. > > The > > > > > command line is forged by the ProcessRuntime and KubernetesRuntime. > > > > > > > > > >> > > > > >> > > > > >> In general, you will essentially have two class loaders - one for > > the > > > > sink > > > > >> and one for the pre-process function? > > > > >> > > > > > Yes, exactly. > > > > > 3 to be more accurate since there's also the instance class loader. > > > > > > > > > > > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet < > > > > bornet.ch...@gmail.com > > > > >>> > > > > >> wrote: > > > > >> > > > > >>> Dear Pulsar dev community, > > > > >>> > > > > >>> I would like to open a discussion here about PIP 193 : Sink > > > > preprocessing > > > > >>> Function <https://github.com/apache/pulsar/issues/16739>. > > > > >>> > > > > >>> Best regards > > > > >>> > > > > >>> Christophe > > > > >>> > > > > >>> ## Motivation > > > > >>> > > > > >>> Pulsar IO connectors make it possible to connect Pulsar to an > > > external > > > > >>> system: > > > > >>> * A Source reads continuously from an external system and writes > > to a > > > > >>> Pulsar topic > > > > >>> * A Sink reads continuously from a Pulsar topic and writes to an > > > > external > > > > >>> system. > > > > >>> Sources and Sinks are written in Java. > > > > >>> > > > > >>> Pulsar also has a lightweight computing system named Pulsar > > > Functions. > > > > A > > > > >>> Pulsar Function reads from one or more topics, applies user logic > > > > written > > > > >>> in Java, Python or Go and writes to an output topic. > > > > >>> > > > > >>> When using Pulsar IO connectors, the format of what is > read/written > > > > >> from/to > > > > >>> the source/sink is defined by the connector code. But there are a > > lot > > > > of > > > > >>> situations where a user wants to transform this data before using > > it. > > > > >>> Currently the solution is to either : > > > > >>> * write a custom connector that transforms the data the way we > want > > > but > > > > >>> that means writing a lot of code without reuse, packaging and > > > managing > > > > >>> custom connectors and so on.. > > > > >>> * write a Function to transform the data after it was written to > a > > > > topic > > > > >> by > > > > >>> a Source or before it is read from a topic by a Sink. This is not > > > very > > > > >>> efficient as we have to use an intermediate topic, which means > > > > additional > > > > >>> storage, IO, and latency. > > > > >>> > > > > >>> Considering all this, it would be handy to be able to apply a > > > Function > > > > >>> on-the-fly to a connector without going through an intermediary > > > topic. > > > > >>> > > > > >>> ## Goal > > > > >>> > > > > >>> This PIP defines the changes needed to be able to apply a > > > preprocessing > > > > >>> Function on-the-fly to a Sink. > > > > >>> The preprocessing function can be a built-in function, a package > > > > >> function, > > > > >>> or loaded through an http URL or a file path. > > > > >>> Sources, Sinks and Functions are based on the same runtime > process > > > > that: > > > > >>> * reads from a Source. For Sinks and Functions this Source is a > > > > >>> PulsarSource consuming from a Pulsar topic > > > > >>> * applies a Function. For Sources and Sinks, this Function is > > > > >>> IdentityFunction which returns the data it gets without > > modification. > > > > >>> * writes to a Sink. For Sources and Functions, this Sink is a > > > > PulsarSink > > > > >>> writing to a Pulsar topic. > > > > >>> > > > > >>> This PIP reuses this and allows configuring a Function different > > from > > > > >>> IdentityFunction to Sinks. > > > > >>> Only Functions returning a Record will be authorized to ensure > that > > > the > > > > >>> Function sets the Schema explicitly. > > > > >>> > > > > >>> Out of the scope of this PIP, for future work: > > > > >>> * Applying a post-processing Function to a Source > > > > >>> * Loading the Function jar through the Sink CLI > > > > >>> > > > > >>> ## API Changes > > > > >>> > > > > >>> ### Admin CLI > > > > >>> > > > > >>> The following options will be added to the `pulsar-admin sinks` > CLI > > > > >>> `create`, `update` and `localrun`: > > > > >>> * `preprocess-function`: the preprocess function applied before > the > > > > Sink. > > > > >>> Starts by `builtin://` for built-in functions, `function://` for > > > > package > > > > >>> function, `http://` or `file://` > > > > >>> * `preprocess-function-classname`: the preprocess function class > > name > > > > >>> (optional if the function is a NAR) > > > > >>> * `preprocess-function-config`: the configuration of the > preprocess > > > > >>> function in the same format as the `user-config` parameter of the > > > > >>> `functions create` CLI command. > > > > >>> > > > > >>> The corresponding fields will be added to `SinkConfig`: > > > > >>> > > > > >>> ```java > > > > >>> private String preprocessFunction; > > > > >>> private String preprocessFunctionClassName; > > > > >>> private String preprocessFunctionConfig; > > > > >>> ``` > > > > >>> > > > > >>> ### Function definition > > > > >>> > > > > >>> The field `extraFunctionPackageLocation` to the protobuf > structure > > > > >>> `FunctionMetaData` will be added. This field will be filled with > > the > > > > >>> location of the extra function to apply when registering a sink > and > > > > used > > > > >> in > > > > >>> the Runtime to load the function code. > > > > >>> > > > > >>> ```protobuf > > > > >>> message FunctionMetaData { > > > > >>> ... > > > > >>> PackageLocationMetaData extraFunctionPackageLocation = 7; > > > > >>> } > > > > >>> ``` > > > > >>> > > > > >>> ### Runtime > > > > >>> > > > > >>> The parameters `extraFunctionFile` and > > > `originalExtraFunctionFileName` > > > > >> will > > > > >>> be added to `RuntimeFactory::createContainer` > > > > >>> > > > > >>> > > > > >>> ```java > > > > >>> Runtime createContainer( > > > > >>> InstanceConfig instanceConfig, String codeFile, String > > > > >>> originalCodeFileName, > > > > >>> String extraFunctionFile, String > > > > >> originalExtraFunctionFileName, > > > > >>> Long expectedHealthCheckInterval) throws Exception; > > > > >>> ``` > > > > >>> > > > > >>> ### Instance function cache > > > > >>> > > > > >>> A field `extraFunctionId` to `InstanceConfig` that will hold the > > UUID > > > > >> cache > > > > >>> key of the extra function will be added. > > > > >>> > > > > >>> ```java > > > > >>> public class InstanceConfig { > > > > >>> private int instanceId; > > > > >>> private String functionId; > > > > >>> private String extraFunctionId; > > > > >>> ``` > > > > >>> > > > > >>> ### JavaInstanceStarter > > > > >>> > > > > >>> > > > > >>> The following parameters will be added to JavaInstanceStarter: > > > > >>> * `--extra_function_jar`: the path to the extra function jar > > > > >>> * `--extra_function_id`: the extra function UUID cache key > > > > >>> > > > > >>> These parameters are then used by the `ThreadRuntime` to load the > > > > >> function > > > > >>> from the `FunctionCacheManager` or create it there if needed. > > > > >>> > > > > >>> ### Download the extra function > > > > >>> > > > > >>> The statefulset spawned in `KubernetesRuntime` needs to be able > to > > > > >> download > > > > >>> the extra functions code via API. > > > > >>> An `extra-function` query param will be added to the download > > > function > > > > >> HTTP > > > > >>> endpoint > > > > >>> > > > > >>> ```java > > > > >>> @Path("/{tenant}/{namespace}/{functionName}/download") > > > > >>> public StreamingOutput downloadFunction( > > > > >>> @ApiParam(value = "The tenant of functions") > > > > >>> final @PathParam("tenant") String tenant, > > > > >>> @ApiParam(value = "The namespace of functions") > > > > >>> final @PathParam("namespace") String namespace, > > > > >>> @ApiParam(value = "The name of functions") > > > > >>> final @PathParam("functionName") String functionName) > { > > > > >>> final @PathParam("functionName") String functionName, > > > > >>> @ApiParam(value = "Whether to download the > > > extra-function") > > > > >>> final @QueryParam("extra-function") boolean > > > extraFunction) { > > > > >>> ``` > > > > >>> > > > > >>> If `extraFunction` is `true` then the extra function will be > > returned > > > > >>> instead of the sink. > > > > >>> > > > > >>> The Java admin SDK will have the following methods added: > > > > >>> > > > > >>> > > > > >>> ```java > > > > >>> /** > > > > >>> * Download Function Code. > > > > >>> * > > > > >>> * @param destinationFile > > > > >>> * file where data should be downloaded to > > > > >>> * @param tenant > > > > >>> * Tenant name > > > > >>> * @param namespace > > > > >>> * Namespace name > > > > >>> * @param function > > > > >>> * Function name > > > > >>> * @param extraFunction > > > > >>> * Whether to download the extra-function (for > > sources > > > > and > > > > >>> sinks) > > > > >>> * @throws PulsarAdminException > > > > >>> */ > > > > >>> void downloadFunction(String destinationFile, String tenant, > > > String > > > > >>> namespace, String function, > > > > >>> boolean extraFunction) throws > > > > >>> PulsarAdminException; > > > > >>> > > > > >>> /** > > > > >>> * Download Function Code asynchronously. > > > > >>> * > > > > >>> * @param destinationFile > > > > >>> * file where data should be downloaded to > > > > >>> * @param tenant > > > > >>> * Tenant name > > > > >>> * @param namespace > > > > >>> * Namespace name > > > > >>> * @param function > > > > >>> * Function name > > > > >>> * @param extraFunction > > > > >>> * Whether to download the extra-function (for > > sources > > > > and > > > > >>> sinks) > > > > >>> */ > > > > >>> CompletableFuture<Void> downloadFunctionAsync( > > > > >>> String destinationFile, String tenant, String > namespace, > > > > >> String > > > > >>> function, boolean extraFunction); > > > > >>> ``` > > > > >>> > > > > >>> The parameter `--extra-function` will be added to the admin CLI > > > command > > > > >>> `functions download` > > > > >>> > > > > >>> ## Implementation > > > > >>> > > > > >>> ### Pulsar-admin > > > > >>> > > > > >>> * Add the admin CLI options when creating/updating/localrunning > the > > > > sink > > > > >>> (see API changes) > > > > >>> > > > > >>> ### Pulsar broker > > > > >>> > > > > >>> * On the broker API, in registerSink/updateSink, if a > preprocessing > > > > >>> function is present in the Sink config, we: > > > > >>> * validate the function > > > > >>> * get the function classloader (from builtin or download a > > package > > > > >>> file) > > > > >>> * load the function > > > > >>> * inspect the function types and set the first arg as Sink > type. > > > > Also > > > > >>> verify that the second arg is of type Record. > > > > >>> * use the function classloader instead of the sink classloader > to > > > > >> verify > > > > >>> if custom schemas, serdes, crypto key readers can be loaded and > are > > > > >>> conform. > > > > >>> * get the function package location and fill the protobuf > > > > >>> extraFunctionPackageLocation field with it. A name for this > > > > preprocessing > > > > >>> function is generated from the sink name so it can be referenced > > when > > > > >>> stored in BookKeeper or in package management. The name of the > > > > >>> preprocessing function is `{sink name}__sink-function`. > > > > >>> * set the `functionDetails` with the preprocessing function > config > > > > >>> (function class name and function userConfig) > > > > >>> > > > > >>> * The `--extra-function` query parameter is added to the > `functions > > > > >>> download` CLI command, admin SDK and HTTP API (see API changes). > > > > >>> > > > > >>> ### Function worker > > > > >>> > > > > >>> * When the `InstanceConfig` is created, an UUID is set to the > > > > >>> `extraFunctionId` field. This field will serve as a cache key for > > the > > > > >> extra > > > > >>> function (see API changes). > > > > >>> * When the `FunctionActioner` starts the function, if > > > > >>> `extraFunctionPackageLocation` is present, the same is done for > the > > > > extra > > > > >>> function as what is done for the connector: > > > > >>> * if the runtime is not externally managed, the extra function > > code > > > is > > > > >>> downloaded from the `extraFunctionPackageLocation` and the > > `Runtime` > > > is > > > > >>> created with the extra package file path and original name (see > API > > > > >> changes > > > > >>> to `RuntimeFactory::createContainer`) > > > > >>> * if the runtime is externally managed, the `Runtime` is created > > > with > > > > >> the > > > > >>> `extraFunctionPackageLocation` and original name. > > > > >>> > > > > >>> * Depending on the configured runtime, if there’s an extra > function > > > > file: > > > > >>> * For the `ThreadRuntime`, the extra function classloader is > > > obtained > > > > >>> with the instance `extraFunctionId` cache key, then this > > classloader > > > is > > > > >>> passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` > > then > > > > >>> switches between the connector classloader and the extra function > > > > >>> classloader accordingly.. > > > > >>> * For the `ProcessRuntime`, the path to the extra function jar > is > > > > added > > > > >>> to the `--extra_function_jar` parameter in the > > `JavaInstanceStarter` > > > > >>> command. The `JavaInstanceStarter` then uses it when creating its > > > > >>> `ThreadRuntime`. > > > > >>> * For the `KubernetesRuntime`, a command is added in the > > statefulset > > > > >> exec > > > > >>> command to download the extra function using the > `–extra-function` > > > flag > > > > >> of > > > > >>> the `functions download` command. And the path to this downloaded > > > jar > > > > is > > > > >>> added to the `--extra_function_jar` parameter of the > > > > >> `JavaInstanceStarter` > > > > >>> command. > > > > >>> > > > > >>> ### LocalRunner > > > > >>> > > > > >>> If `sinkConfig` has a `preprocessFunction`, the `LocalRunner` > will > > > use > > > > >> the > > > > >>> same methods as in the broker to get the function file and > > > > >>> `functionDetails` and use them when spawning the `Runtime`. > > > > >>> > > > > >>> ## Reject Alternatives > > > > >>> > > > > >>> N/A > > > > >>> > > > > >> > > > > > > > > > > > > > > > > -- > Best Regards, > Neng >