My feedback is to make this change as self contained as possible.  Can we
just have a special implementation of a sink that will run the logic of the
"preprocess" function?  There are many places in the code where we figure
out if it is a source, sink or a function based on the fields in the
Function metadata.  Changing that may have unintended consequences.

On Mon, Jul 25, 2022 at 5:55 AM Baodi Shi <baodi....@icloud.com.invalid>
wrote:

> > Can you explain more what you mean ?
> This PIP doesn't change the API of a Function and it's already possible to
> write a Function<?, Record<?>>.
> And when declaring a Sink with a Function we'll check that it's the case.
>
> I mean: we should constrain the function interface, otherwise, the user
> may return a structure that is not a record.
>
> Thanks,
> Baodi Shi
>
> > On Jul 25, 2022, at 01:0233, Christophe Bornet <bornet.ch...@gmail.com>
> wrote:
> >
> > Thanks for the feedback Asaf
> >
> >
> >>>   - preprocess-function: the preprocess function applied before the
> >>>   Sink. Starts by builtin:// for built-in functions, function:// for
> >>>   package function, http:// or file://
> >>>
> >>> 1. While this function is applied only before sink? I thought it
> replaces
> >> the identity function, so why a source can't have a function that reads
> >> from the source (say S3), runs the function and only then writes to a
> >> pulsar topic?
> >>
> >
> > Yes that's totally possible to implement and will be done in future work
> > like written in the PIP.
> >
> >
> >> 2. Can you clarify more about built in and function for package
> function?
> >> Is this an existing functionality ?
> >>
> > Yes those are existing functionalities.
> > Built-in functions are not documented (and we should do something about
> > that).
> > Package management of functions is described in
> >
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> >
> >
> >> 3. Regarding http - Are you loading a class through that URL? Aren't we
> >> exposed to same problem Log4Shell security issue had? If so, what
> measures
> >> are you taking to protect ?
> >>
> > Yes we are loading code via URL. This feature already exists for
> > Sources/Sinks/Functions.
> > I guess you need to have a huge trust of the source from where you
> download.
> > This PIP has the same security level as what already exists for this
> > functionality.
> >
> >
> >>
> >> The field extraFunctionPackageLocation to the protobuf structure
> >>> FunctionMetaData will be added. This field will be filled with the
> >>> location of the extra function to apply when registering a sink and
> used
> >> in
> >>> the Runtime to load the function code.
> >>
> >> Can you please expand on that? You mean the JAR location, which you will
> >> search that class name and function specified in the 3 fields you've
> added
> >> to the config?
> >>
> > Not exactly. It's the location of where the JAR is stored. It can be
> > BookKeeper, package management, built-in NAR, etc...
> > In KubernetesRuntime, there are cases where the builtin or package
> function
> > you provide in the preprocess-function param could be copied to BK.
> > That's the same as for a regular Sink/Source and if we need to copy to
> BK,
> > we append `__sink-function` to the storage path to prevent conflict with
> > the sink code.
> > The class name is indeed looked up in this JAR.
> >
> >
> >> The parameters extraFunctionFile and originalExtraFunctionFileName will
> be
> >>> added to RuntimeFactory::createContainer
> >>
> >> 1. File and fileName containing what? How does this related to
> >> extraFunctionPackageLocation?
> >>
> > That part mimicks what is already done for the main code of the
> Source/Sink
> > code (with respectively codeFile, originalCodeFileName and
> packageLocation)
> > Before starting the ThreadedRuntime, we download locally the JAR from the
> > extraFunctionPackageLocation in the extraFunctionFile so we can load the
> > code from it.
> >
> >
> >>
> >> In here you use the terminology Extra Function" and in fields of config
> and
> >> admin you used the term Pre-Process Function. I would stick to
> Pro-Process
> >> Function and stick with it all over.
> >>
> > This terminology need to be applicable to a Function that would be
> applied
> > after a Source so we can't use  "preprocess" here.
> > I haven't found better than "extra function". Don't hesitate to propose
> > something !
> >
> >
> >>
> >>
> >>> The following parameters will be added to JavaInstanceStarter:
> >>>
> >>>   - --extra_function_jar: the path to the extra function jar
> >>>
> >>>
> >>>   - --extra_function_id: the extra function UUID cache key
> >>>
> >>> These parameters are then used by the ThreadRuntime to load the
> function
> >>> from the FunctionCacheManager or create it there if needed.
> >>
> >>
> >> Can you elaborate on that? JavaInstanceStarter is used to start a single
> >> Function? It's used from command line?
> >
> > The  JavaInstanceStarter is indeed a CLI to start a JavaInstance.
> > The JavaInstance is the process that will execute the code to read from a
> > Source, execute a Function and write to a Sink.
> > Generally Pulsar users don't use the JavaInstanceStarter directly. The
> > command line is forged by the ProcessRuntime and KubernetesRuntime.
> >
> >>
> >>
> >> In general, you will essentially have two class loaders - one for the
> sink
> >> and one for the pre-process function?
> >>
> > Yes, exactly.
> > 3 to be more accurate since there's also the instance class loader.
> >
> >
> >>
> >>
> >>
> >>
> >>
> >> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet <
> bornet.ch...@gmail.com
> >>>
> >> wrote:
> >>
> >>> Dear Pulsar dev community,
> >>>
> >>> I would like to open a discussion here about PIP 193 : Sink
> preprocessing
> >>> Function <https://github.com/apache/pulsar/issues/16739>.
> >>>
> >>> Best regards
> >>>
> >>> Christophe
> >>>
> >>> ## Motivation
> >>>
> >>> Pulsar IO connectors make it possible to connect Pulsar to an external
> >>> system:
> >>> * A Source reads continuously from an external system and writes to a
> >>> Pulsar topic
> >>> * A Sink reads continuously from a Pulsar topic and writes to an
> external
> >>> system.
> >>> Sources and Sinks are written in Java.
> >>>
> >>> Pulsar also has a lightweight computing system named Pulsar Functions.
> A
> >>> Pulsar Function reads from one or more topics, applies user logic
> written
> >>> in Java, Python or Go and writes to an output topic.
> >>>
> >>> When using Pulsar IO connectors, the format of what is read/written
> >> from/to
> >>> the source/sink is defined by the connector code. But there are a lot
> of
> >>> situations where a user wants to transform this data before using it.
> >>> Currently the solution is to either :
> >>> * write a custom connector that transforms the data the way we want but
> >>> that means writing a lot of code without reuse, packaging and managing
> >>> custom connectors and so on..
> >>> * write a Function to transform the data after it was written to a
> topic
> >> by
> >>> a Source or before it is read from a topic by a Sink. This is not very
> >>> efficient as we have to use an intermediate topic, which means
> additional
> >>> storage, IO, and latency.
> >>>
> >>> Considering all this, it would be handy to be able to apply a Function
> >>> on-the-fly to a connector without going through an intermediary topic.
> >>>
> >>> ## Goal
> >>>
> >>> This PIP defines the changes needed to be able to apply a preprocessing
> >>> Function on-the-fly to a Sink.
> >>> The preprocessing function can be a built-in function, a package
> >> function,
> >>> or loaded through an http URL or a file path.
> >>> Sources, Sinks and Functions are based on the same runtime process
> that:
> >>> * reads from a Source. For Sinks and Functions this Source is a
> >>> PulsarSource consuming from a Pulsar topic
> >>> * applies a Function. For Sources and Sinks, this Function is
> >>> IdentityFunction which returns the data it gets without modification.
> >>> * writes to a Sink. For Sources and Functions, this Sink is a
> PulsarSink
> >>> writing to a Pulsar topic.
> >>>
> >>> This PIP reuses this and allows configuring a Function different from
> >>> IdentityFunction to Sinks.
> >>> Only Functions returning a Record will be authorized to ensure that the
> >>> Function sets the Schema explicitly.
> >>>
> >>> Out of the scope of this PIP, for future work:
> >>> * Applying a post-processing Function to a Source
> >>> * Loading the Function jar through the Sink CLI
> >>>
> >>> ## API Changes
> >>>
> >>> ### Admin CLI
> >>>
> >>> The following options will be added to the `pulsar-admin sinks` CLI
> >>> `create`, `update` and `localrun`:
> >>> * `preprocess-function`: the preprocess function applied before the
> Sink.
> >>> Starts by `builtin://` for built-in functions, `function://` for
> package
> >>> function, `http://` or `file://`
> >>> * `preprocess-function-classname`: the preprocess function class name
> >>> (optional if the function is a NAR)
> >>> * `preprocess-function-config`: the configuration of the preprocess
> >>> function in the same format as the `user-config` parameter of the
> >>> `functions create` CLI command.
> >>>
> >>> The corresponding fields will be added to `SinkConfig`:
> >>>
> >>> ```java
> >>>    private String preprocessFunction;
> >>>    private String preprocessFunctionClassName;
> >>>    private String preprocessFunctionConfig;
> >>> ```
> >>>
> >>> ### Function definition
> >>>
> >>> The field `extraFunctionPackageLocation` to the protobuf structure
> >>> `FunctionMetaData` will be added. This field will be filled with the
> >>> location of the extra function to apply when registering a sink and
> used
> >> in
> >>> the Runtime to load the function code.
> >>>
> >>> ```protobuf
> >>> message FunctionMetaData {
> >>>    ...
> >>>    PackageLocationMetaData extraFunctionPackageLocation = 7;
> >>> }
> >>> ```
> >>>
> >>> ### Runtime
> >>>
> >>> The parameters `extraFunctionFile` and `originalExtraFunctionFileName`
> >> will
> >>> be added to `RuntimeFactory::createContainer`
> >>>
> >>>
> >>> ```java
> >>>   Runtime createContainer(
> >>>            InstanceConfig instanceConfig, String codeFile, String
> >>> originalCodeFileName,
> >>>            String extraFunctionFile, String
> >> originalExtraFunctionFileName,
> >>>            Long expectedHealthCheckInterval) throws Exception;
> >>> ```
> >>>
> >>> ### Instance function cache
> >>>
> >>> A field `extraFunctionId` to `InstanceConfig` that will hold the UUID
> >> cache
> >>> key of the extra function will be added.
> >>>
> >>> ```java
> >>> public class InstanceConfig {
> >>>    private int instanceId;
> >>>    private String functionId;
> >>>    private String extraFunctionId;
> >>> ```
> >>>
> >>> ### JavaInstanceStarter
> >>>
> >>>
> >>> The following parameters will be added to JavaInstanceStarter:
> >>> * `--extra_function_jar`: the path to the extra function jar
> >>> * `--extra_function_id`: the extra function UUID cache key
> >>>
> >>> These parameters are then used by the `ThreadRuntime` to load the
> >> function
> >>> from the `FunctionCacheManager` or create it there if needed.
> >>>
> >>> ### Download the extra function
> >>>
> >>> The statefulset spawned in `KubernetesRuntime` needs to be able to
> >> download
> >>> the extra functions code via API.
> >>> An `extra-function` query param will be added to the download function
> >> HTTP
> >>> endpoint
> >>>
> >>> ```java
> >>>   @Path("/{tenant}/{namespace}/{functionName}/download")
> >>>    public StreamingOutput downloadFunction(
> >>>            @ApiParam(value = "The tenant of functions")
> >>>            final @PathParam("tenant") String tenant,
> >>>            @ApiParam(value = "The namespace of functions")
> >>>            final @PathParam("namespace") String namespace,
> >>>            @ApiParam(value = "The name of functions")
> >>>            final @PathParam("functionName") String functionName) {
> >>>            final @PathParam("functionName") String functionName,
> >>>            @ApiParam(value = "Whether to download the extra-function")
> >>>            final @QueryParam("extra-function") boolean extraFunction) {
> >>> ```
> >>>
> >>> If `extraFunction` is `true` then the extra function will be returned
> >>> instead of the sink.
> >>>
> >>> The Java admin SDK will have the following methods added:
> >>>
> >>>
> >>> ```java
> >>>   /**
> >>>     * Download Function Code.
> >>>     *
> >>>     * @param destinationFile
> >>>     *           file where data should be downloaded to
> >>>     * @param tenant
> >>>     *            Tenant name
> >>>     * @param namespace
> >>>     *            Namespace name
> >>>     * @param function
> >>>     *            Function name
> >>>     * @param extraFunction
> >>>     *            Whether to download the extra-function (for sources
> and
> >>> sinks)
> >>>     * @throws PulsarAdminException
> >>>     */
> >>>    void downloadFunction(String destinationFile, String tenant, String
> >>> namespace, String function,
> >>>                          boolean extraFunction) throws
> >>> PulsarAdminException;
> >>>
> >>>    /**
> >>>     * Download Function Code asynchronously.
> >>>     *
> >>>     * @param destinationFile
> >>>     *           file where data should be downloaded to
> >>>     * @param tenant
> >>>     *            Tenant name
> >>>     * @param namespace
> >>>     *            Namespace name
> >>>     * @param function
> >>>     *            Function name
> >>>     * @param extraFunction
> >>>     *            Whether to download the extra-function (for sources
> and
> >>> sinks)
> >>>     */
> >>>    CompletableFuture<Void> downloadFunctionAsync(
> >>>            String destinationFile, String tenant, String namespace,
> >> String
> >>> function, boolean extraFunction);
> >>> ```
> >>>
> >>> The parameter `--extra-function` will be added to the admin CLI command
> >>> `functions download`
> >>>
> >>> ## Implementation
> >>>
> >>> ### Pulsar-admin
> >>>
> >>> * Add the admin CLI options when creating/updating/localrunning the
> sink
> >>> (see API changes)
> >>>
> >>> ### Pulsar broker
> >>>
> >>> * On the broker API, in registerSink/updateSink, if a preprocessing
> >>> function is present in the Sink config, we:
> >>>  * validate the function
> >>>    * get the function classloader (from builtin or download a package
> >>> file)
> >>>    * load the function
> >>>    * inspect the function types and set the first arg as Sink type.
> Also
> >>> verify that the second arg is of type Record.
> >>>  * use the function classloader instead of the sink classloader to
> >> verify
> >>> if custom schemas, serdes, crypto key readers can be loaded and are
> >>> conform.
> >>>  * get the function package location and fill the protobuf
> >>> extraFunctionPackageLocation field with it. A name for this
> preprocessing
> >>> function is generated from the sink name so it can be referenced when
> >>> stored in BookKeeper or in package management. The name of the
> >>> preprocessing function is `{sink name}__sink-function`.
> >>>  * set the `functionDetails` with the preprocessing function config
> >>> (function class name and function userConfig)
> >>>
> >>> * The `--extra-function` query parameter is added to the `functions
> >>> download` CLI command, admin SDK and HTTP API (see API changes).
> >>>
> >>> ### Function worker
> >>>
> >>> * When the `InstanceConfig` is created, an UUID is set to the
> >>> `extraFunctionId` field. This field will serve as a cache key for the
> >> extra
> >>> function (see API changes).
> >>> * When the `FunctionActioner` starts the function, if
> >>> `extraFunctionPackageLocation` is present, the same is done for the
> extra
> >>> function as what is done for the connector:
> >>>  * if the runtime is not externally managed, the extra function code is
> >>> downloaded from the `extraFunctionPackageLocation` and the `Runtime` is
> >>> created with the extra package file path and original name (see API
> >> changes
> >>> to `RuntimeFactory::createContainer`)
> >>>  * if the runtime is externally managed, the `Runtime` is created with
> >> the
> >>> `extraFunctionPackageLocation` and original name.
> >>>
> >>> * Depending on the configured runtime, if there’s an extra function
> file:
> >>>  * For the `ThreadRuntime`, the extra function classloader is obtained
> >>> with the instance `extraFunctionId` cache key, then this classloader is
> >>> passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` then
> >>> switches between the connector classloader and the extra function
> >>> classloader accordingly..
> >>>  * For the `ProcessRuntime`, the path to the extra function jar is
> added
> >>> to the `--extra_function_jar` parameter in the `JavaInstanceStarter`
> >>> command. The `JavaInstanceStarter` then uses it when creating its
> >>> `ThreadRuntime`.
> >>>  * For the `KubernetesRuntime`, a command is added in the statefulset
> >> exec
> >>> command to download the extra function using the `–extra-function` flag
> >> of
> >>> the `functions download` command. And the path to this downloaded  jar
> is
> >>> added to the `--extra_function_jar` parameter of the
> >> `JavaInstanceStarter`
> >>> command.
> >>>
> >>> ### LocalRunner
> >>>
> >>> If  `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will use
> >> the
> >>> same methods as in the broker to get the function file and
> >>> `functionDetails` and use them when spawning the `Runtime`.
> >>>
> >>> ## Reject Alternatives
> >>>
> >>> N/A
> >>>
> >>
>
>

Reply via email to