> Can you explain more what you mean ?
This PIP doesn't change the API of a Function and it's already possible to
write a Function<?, Record<?>>.
And when declaring a Sink with a Function we'll check that it's the case.

I mean: we should constrain the function interface, otherwise, the user may 
return a structure that is not a record.

Thanks,
Baodi Shi

> On Jul 25, 2022, at 01:0233, Christophe Bornet <bornet.ch...@gmail.com> wrote:
> 
> Thanks for the feedback Asaf
> 
> 
>>>   - preprocess-function: the preprocess function applied before the
>>>   Sink. Starts by builtin:// for built-in functions, function:// for
>>>   package function, http:// or file://
>>> 
>>> 1. While this function is applied only before sink? I thought it replaces
>> the identity function, so why a source can't have a function that reads
>> from the source (say S3), runs the function and only then writes to a
>> pulsar topic?
>> 
> 
> Yes that's totally possible to implement and will be done in future work
> like written in the PIP.
> 
> 
>> 2. Can you clarify more about built in and function for package function?
>> Is this an existing functionality ?
>> 
> Yes those are existing functionalities.
> Built-in functions are not documented (and we should do something about
> that).
> Package management of functions is described in
> https://pulsar.apache.org/docs/functions-deploy#use-package-management-service
> 
> 
>> 3. Regarding http - Are you loading a class through that URL? Aren't we
>> exposed to same problem Log4Shell security issue had? If so, what measures
>> are you taking to protect ?
>> 
> Yes we are loading code via URL. This feature already exists for
> Sources/Sinks/Functions.
> I guess you need to have a huge trust of the source from where you download.
> This PIP has the same security level as what already exists for this
> functionality.
> 
> 
>> 
>> The field extraFunctionPackageLocation to the protobuf structure
>>> FunctionMetaData will be added. This field will be filled with the
>>> location of the extra function to apply when registering a sink and used
>> in
>>> the Runtime to load the function code.
>> 
>> Can you please expand on that? You mean the JAR location, which you will
>> search that class name and function specified in the 3 fields you've added
>> to the config?
>> 
> Not exactly. It's the location of where the JAR is stored. It can be
> BookKeeper, package management, built-in NAR, etc...
> In KubernetesRuntime, there are cases where the builtin or package function
> you provide in the preprocess-function param could be copied to BK.
> That's the same as for a regular Sink/Source and if we need to copy to BK,
> we append `__sink-function` to the storage path to prevent conflict with
> the sink code.
> The class name is indeed looked up in this JAR.
> 
> 
>> The parameters extraFunctionFile and originalExtraFunctionFileName will be
>>> added to RuntimeFactory::createContainer
>> 
>> 1. File and fileName containing what? How does this related to
>> extraFunctionPackageLocation?
>> 
> That part mimicks what is already done for the main code of the Source/Sink
> code (with respectively codeFile, originalCodeFileName and packageLocation)
> Before starting the ThreadedRuntime, we download locally the JAR from the
> extraFunctionPackageLocation in the extraFunctionFile so we can load the
> code from it.
> 
> 
>> 
>> In here you use the terminology Extra Function" and in fields of config and
>> admin you used the term Pre-Process Function. I would stick to Pro-Process
>> Function and stick with it all over.
>> 
> This terminology need to be applicable to a Function that would be applied
> after a Source so we can't use  "preprocess" here.
> I haven't found better than "extra function". Don't hesitate to propose
> something !
> 
> 
>> 
>> 
>>> The following parameters will be added to JavaInstanceStarter:
>>> 
>>>   - --extra_function_jar: the path to the extra function jar
>>> 
>>> 
>>>   - --extra_function_id: the extra function UUID cache key
>>> 
>>> These parameters are then used by the ThreadRuntime to load the function
>>> from the FunctionCacheManager or create it there if needed.
>> 
>> 
>> Can you elaborate on that? JavaInstanceStarter is used to start a single
>> Function? It's used from command line?
> 
> The  JavaInstanceStarter is indeed a CLI to start a JavaInstance.
> The JavaInstance is the process that will execute the code to read from a
> Source, execute a Function and write to a Sink.
> Generally Pulsar users don't use the JavaInstanceStarter directly. The
> command line is forged by the ProcessRuntime and KubernetesRuntime.
> 
>> 
>> 
>> In general, you will essentially have two class loaders - one for the sink
>> and one for the pre-process function?
>> 
> Yes, exactly.
> 3 to be more accurate since there's also the instance class loader.
> 
> 
>> 
>> 
>> 
>> 
>> 
>> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet <bornet.ch...@gmail.com
>>> 
>> wrote:
>> 
>>> Dear Pulsar dev community,
>>> 
>>> I would like to open a discussion here about PIP 193 : Sink preprocessing
>>> Function <https://github.com/apache/pulsar/issues/16739>.
>>> 
>>> Best regards
>>> 
>>> Christophe
>>> 
>>> ## Motivation
>>> 
>>> Pulsar IO connectors make it possible to connect Pulsar to an external
>>> system:
>>> * A Source reads continuously from an external system and writes to a
>>> Pulsar topic
>>> * A Sink reads continuously from a Pulsar topic and writes to an external
>>> system.
>>> Sources and Sinks are written in Java.
>>> 
>>> Pulsar also has a lightweight computing system named Pulsar Functions. A
>>> Pulsar Function reads from one or more topics, applies user logic written
>>> in Java, Python or Go and writes to an output topic.
>>> 
>>> When using Pulsar IO connectors, the format of what is read/written
>> from/to
>>> the source/sink is defined by the connector code. But there are a lot of
>>> situations where a user wants to transform this data before using it.
>>> Currently the solution is to either :
>>> * write a custom connector that transforms the data the way we want but
>>> that means writing a lot of code without reuse, packaging and managing
>>> custom connectors and so on..
>>> * write a Function to transform the data after it was written to a topic
>> by
>>> a Source or before it is read from a topic by a Sink. This is not very
>>> efficient as we have to use an intermediate topic, which means additional
>>> storage, IO, and latency.
>>> 
>>> Considering all this, it would be handy to be able to apply a Function
>>> on-the-fly to a connector without going through an intermediary topic.
>>> 
>>> ## Goal
>>> 
>>> This PIP defines the changes needed to be able to apply a preprocessing
>>> Function on-the-fly to a Sink.
>>> The preprocessing function can be a built-in function, a package
>> function,
>>> or loaded through an http URL or a file path.
>>> Sources, Sinks and Functions are based on the same runtime process that:
>>> * reads from a Source. For Sinks and Functions this Source is a
>>> PulsarSource consuming from a Pulsar topic
>>> * applies a Function. For Sources and Sinks, this Function is
>>> IdentityFunction which returns the data it gets without modification.
>>> * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
>>> writing to a Pulsar topic.
>>> 
>>> This PIP reuses this and allows configuring a Function different from
>>> IdentityFunction to Sinks.
>>> Only Functions returning a Record will be authorized to ensure that the
>>> Function sets the Schema explicitly.
>>> 
>>> Out of the scope of this PIP, for future work:
>>> * Applying a post-processing Function to a Source
>>> * Loading the Function jar through the Sink CLI
>>> 
>>> ## API Changes
>>> 
>>> ### Admin CLI
>>> 
>>> The following options will be added to the `pulsar-admin sinks` CLI
>>> `create`, `update` and `localrun`:
>>> * `preprocess-function`: the preprocess function applied before the Sink.
>>> Starts by `builtin://` for built-in functions, `function://` for package
>>> function, `http://` or `file://`
>>> * `preprocess-function-classname`: the preprocess function class name
>>> (optional if the function is a NAR)
>>> * `preprocess-function-config`: the configuration of the preprocess
>>> function in the same format as the `user-config` parameter of the
>>> `functions create` CLI command.
>>> 
>>> The corresponding fields will be added to `SinkConfig`:
>>> 
>>> ```java
>>>    private String preprocessFunction;
>>>    private String preprocessFunctionClassName;
>>>    private String preprocessFunctionConfig;
>>> ```
>>> 
>>> ### Function definition
>>> 
>>> The field `extraFunctionPackageLocation` to the protobuf structure
>>> `FunctionMetaData` will be added. This field will be filled with the
>>> location of the extra function to apply when registering a sink and used
>> in
>>> the Runtime to load the function code.
>>> 
>>> ```protobuf
>>> message FunctionMetaData {
>>>    ...
>>>    PackageLocationMetaData extraFunctionPackageLocation = 7;
>>> }
>>> ```
>>> 
>>> ### Runtime
>>> 
>>> The parameters `extraFunctionFile` and `originalExtraFunctionFileName`
>> will
>>> be added to `RuntimeFactory::createContainer`
>>> 
>>> 
>>> ```java
>>>   Runtime createContainer(
>>>            InstanceConfig instanceConfig, String codeFile, String
>>> originalCodeFileName,
>>>            String extraFunctionFile, String
>> originalExtraFunctionFileName,
>>>            Long expectedHealthCheckInterval) throws Exception;
>>> ```
>>> 
>>> ### Instance function cache
>>> 
>>> A field `extraFunctionId` to `InstanceConfig` that will hold the UUID
>> cache
>>> key of the extra function will be added.
>>> 
>>> ```java
>>> public class InstanceConfig {
>>>    private int instanceId;
>>>    private String functionId;
>>>    private String extraFunctionId;
>>> ```
>>> 
>>> ### JavaInstanceStarter
>>> 
>>> 
>>> The following parameters will be added to JavaInstanceStarter:
>>> * `--extra_function_jar`: the path to the extra function jar
>>> * `--extra_function_id`: the extra function UUID cache key
>>> 
>>> These parameters are then used by the `ThreadRuntime` to load the
>> function
>>> from the `FunctionCacheManager` or create it there if needed.
>>> 
>>> ### Download the extra function
>>> 
>>> The statefulset spawned in `KubernetesRuntime` needs to be able to
>> download
>>> the extra functions code via API.
>>> An `extra-function` query param will be added to the download function
>> HTTP
>>> endpoint
>>> 
>>> ```java
>>>   @Path("/{tenant}/{namespace}/{functionName}/download")
>>>    public StreamingOutput downloadFunction(
>>>            @ApiParam(value = "The tenant of functions")
>>>            final @PathParam("tenant") String tenant,
>>>            @ApiParam(value = "The namespace of functions")
>>>            final @PathParam("namespace") String namespace,
>>>            @ApiParam(value = "The name of functions")
>>>            final @PathParam("functionName") String functionName) {
>>>            final @PathParam("functionName") String functionName,
>>>            @ApiParam(value = "Whether to download the extra-function")
>>>            final @QueryParam("extra-function") boolean extraFunction) {
>>> ```
>>> 
>>> If `extraFunction` is `true` then the extra function will be returned
>>> instead of the sink.
>>> 
>>> The Java admin SDK will have the following methods added:
>>> 
>>> 
>>> ```java
>>>   /**
>>>     * Download Function Code.
>>>     *
>>>     * @param destinationFile
>>>     *           file where data should be downloaded to
>>>     * @param tenant
>>>     *            Tenant name
>>>     * @param namespace
>>>     *            Namespace name
>>>     * @param function
>>>     *            Function name
>>>     * @param extraFunction
>>>     *            Whether to download the extra-function (for sources and
>>> sinks)
>>>     * @throws PulsarAdminException
>>>     */
>>>    void downloadFunction(String destinationFile, String tenant, String
>>> namespace, String function,
>>>                          boolean extraFunction) throws
>>> PulsarAdminException;
>>> 
>>>    /**
>>>     * Download Function Code asynchronously.
>>>     *
>>>     * @param destinationFile
>>>     *           file where data should be downloaded to
>>>     * @param tenant
>>>     *            Tenant name
>>>     * @param namespace
>>>     *            Namespace name
>>>     * @param function
>>>     *            Function name
>>>     * @param extraFunction
>>>     *            Whether to download the extra-function (for sources and
>>> sinks)
>>>     */
>>>    CompletableFuture<Void> downloadFunctionAsync(
>>>            String destinationFile, String tenant, String namespace,
>> String
>>> function, boolean extraFunction);
>>> ```
>>> 
>>> The parameter `--extra-function` will be added to the admin CLI command
>>> `functions download`
>>> 
>>> ## Implementation
>>> 
>>> ### Pulsar-admin
>>> 
>>> * Add the admin CLI options when creating/updating/localrunning the sink
>>> (see API changes)
>>> 
>>> ### Pulsar broker
>>> 
>>> * On the broker API, in registerSink/updateSink, if a preprocessing
>>> function is present in the Sink config, we:
>>>  * validate the function
>>>    * get the function classloader (from builtin or download a package
>>> file)
>>>    * load the function
>>>    * inspect the function types and set the first arg as Sink type. Also
>>> verify that the second arg is of type Record.
>>>  * use the function classloader instead of the sink classloader to
>> verify
>>> if custom schemas, serdes, crypto key readers can be loaded and are
>>> conform.
>>>  * get the function package location and fill the protobuf
>>> extraFunctionPackageLocation field with it. A name for this preprocessing
>>> function is generated from the sink name so it can be referenced when
>>> stored in BookKeeper or in package management. The name of the
>>> preprocessing function is `{sink name}__sink-function`.
>>>  * set the `functionDetails` with the preprocessing function config
>>> (function class name and function userConfig)
>>> 
>>> * The `--extra-function` query parameter is added to the `functions
>>> download` CLI command, admin SDK and HTTP API (see API changes).
>>> 
>>> ### Function worker
>>> 
>>> * When the `InstanceConfig` is created, an UUID is set to the
>>> `extraFunctionId` field. This field will serve as a cache key for the
>> extra
>>> function (see API changes).
>>> * When the `FunctionActioner` starts the function, if
>>> `extraFunctionPackageLocation` is present, the same is done for the extra
>>> function as what is done for the connector:
>>>  * if the runtime is not externally managed, the extra function code is
>>> downloaded from the `extraFunctionPackageLocation` and the `Runtime` is
>>> created with the extra package file path and original name (see API
>> changes
>>> to `RuntimeFactory::createContainer`)
>>>  * if the runtime is externally managed, the `Runtime` is created with
>> the
>>> `extraFunctionPackageLocation` and original name.
>>> 
>>> * Depending on the configured runtime, if there’s an extra function file:
>>>  * For the `ThreadRuntime`, the extra function classloader is obtained
>>> with the instance `extraFunctionId` cache key, then this classloader is
>>> passed to the `JavaInstanceRunnable`. The `JavaInstanceRunnable` then
>>> switches between the connector classloader and the extra function
>>> classloader accordingly..
>>>  * For the `ProcessRuntime`, the path to the extra function jar is added
>>> to the `--extra_function_jar` parameter in the `JavaInstanceStarter`
>>> command. The `JavaInstanceStarter` then uses it when creating its
>>> `ThreadRuntime`.
>>>  * For the `KubernetesRuntime`, a command is added in the statefulset
>> exec
>>> command to download the extra function using the `–extra-function` flag
>> of
>>> the `functions download` command. And the path to this downloaded  jar is
>>> added to the `--extra_function_jar` parameter of the
>> `JavaInstanceStarter`
>>> command.
>>> 
>>> ### LocalRunner
>>> 
>>> If  `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will use
>> the
>>> same methods as in the broker to get the function file and
>>> `functionDetails` and use them when spawning the `Runtime`.
>>> 
>>> ## Reject Alternatives
>>> 
>>> N/A
>>> 
>> 

Reply via email to