>
>  The interceptor would be loaded at function registration.

The interceptor classes should be loaded at broker/worker startup time.

In order for an
> interceptor to be picked up, a jar would need to be compiled with the class
> that implements the interface, and the jar would need to be deployed into a
> directory that matches the directory provided somewhere. (Would this
> directory be specified in function_worker.yml? Or, would we need to reuse
> one of the existing interceptor directories and filter classes at
> runtime?)
>

The JAR with classes for the interceptor can just be placed in the lib/
directory of the pulsar project.  An additional config may be needed to be
added to function_worker.yml so the classname of the interceptor impl to be
provided.

In order for changes to the FunctionConfig values to reach where the
> Function producers are created, we'd still need to add the producer
> properties to the Function.proto and FunctionConfig.ProducerConfig
> definitions. We'd also still need to modify CLI parameters to ensure these
> values could be modified on a per-function basis.
>

For you use case of disabling batching, you will still need to add an
additional field in the Function.proto so that it can be configurable.  I
would recommend you doing that work separately since that work is
orthogonal to interceptors.

When a function is registered, instead of retrieving cluster-wide function
> producer defaults from WorkerConfig, it would load the interceptor, which
> would modify the FunctionConfig's ProducerConfig's property values to
> provide the cluster-wide defaults for any value that wasn't provided by the
> REST call.
>

Yes.

If an interceptor wasn't provided, during conversion of the ProducerConfig
> to the protobuf ProducerSpec, the protobuf defaults will take effect (since
> protobuf doesn't allow null primitives), which will then be picked up when
> the producer is created.
>


If an interceptor impl is not provided, the behavior of the APIs should be
the same as the current behavior.


Back in the Streamlio, I actually created an interceptor mechanism.

Please take at this old PR:

https://github.com/apache/pulsar/pull/7159

or more specifically what is done for functions here.

https://github.com/apache/pulsar/pull/7159/files#diff-427711b7c1c3b9f17cd1c02bb9ceb942af454676dd97902a43e910f645febc6d


Perhaps you can help revive this work!

Best,

Jerry

On Mon, Apr 5, 2021 at 11:46 AM Devin Bost <devin.b...@gmail.com> wrote:

> Here's my understanding of what I'd need to do to implement the interceptor
> approach.
> The interceptor would be loaded at function registration. In order for an
> interceptor to be picked up, a jar would need to be compiled with the class
> that implements the interface, and the jar would need to be deployed into a
> directory that matches the directory provided somewhere. (Would this
> directory be specified in function_worker.yml? Or, would we need to reuse
> one of the existing interceptor directories and filter classes at
> runtime?)
> In order for changes to the FunctionConfig values to reach where the
> Function producers are created, we'd still need to add the producer
> properties to the Function.proto and FunctionConfig.ProducerConfig
> definitions. We'd also still need to modify CLI parameters to ensure these
> values could be modified on a per-function basis.
> When a function is registered, instead of retrieving cluster-wide function
> producer defaults from WorkerConfig, it would load the interceptor, which
> would modify the FunctionConfig's ProducerConfig's property values to
> provide the cluster-wide defaults for any value that wasn't provided by the
> REST call.
> If an interceptor wasn't provided, during conversion of the ProducerConfig
> to the protobuf ProducerSpec, the protobuf defaults will take effect (since
> protobuf doesn't allow null primitives), which will then be picked up when
> the producer is created.
>
> Is that right?
>
> Devin G. Bost
>
>
> On Mon, Apr 5, 2021 at 11:56 AM Devin Bost <devin.b...@gmail.com> wrote:
>
> > If we use the interceptor approach, does that mean a cluster-admin would
> > need to write custom code in order to override the function producer
> > settings? I worry about adding additional burden to cluster
> administration.
> >
> > --
> > Devin G. Bost
> >
> > On Sat, Apr 3, 2021, 8:26 AM Devin Bost <devin.b...@gmail.com> wrote:
> >
> >> Hi Rui,
> >>
> >> Thanks for the feedback. My understanding about the interceptor is that
> >> it's not something that would be touched by users but would only be
> >> modified by cluster admins. So, I think we'd still need to include the
> >> parameters the parameters on the ProducerConfig.
> >> I already have the other approach coded, so changing the design will
> >> require some rework.
> >> I'll look at that example you mentioned.
> >>
> >> --
> >> Devin G. Bost
> >>
> >> On Fri, Apr 2, 2021, 10:18 PM Rui Fu <f...@rui.sh> wrote:
> >>
> >>> Hi Devin,
> >>>
> >>> Great proposal for giving function such useful feature, and thanks for
> >>> your detailed writing. After going through the history of this
> discussion,
> >>> I would like to have +1 with Jerry’s suggestion. With the interceptor,
> the
> >>> admins and users may have the maximum flexibility to customize their
> >>> producer configs. It may also reduce the parameters we put into the
> configs
> >>> and pulsar-admin. Also, we have some interceptors with Pulsar Functions
> >>> already, like the `RuntimeCustomizer` interface, so it is not a new
> tech
> >>> that users hard to adopt.
> >>>
> >>> Besides, I would like to suggest that to cover these changes with
> Python
> >>> & Go runtime as well if we will have some new fields to
> `Function.proto`.
> >>>
> >>> Best,
> >>>
> >>> Rui
> >>> 在 2021年4月2日 +0800 AM10:13,Devin Bost <devin.b...@gmail.com>,写道:
> >>> > What would be some of the additional benefits of using the
> interceptor
> >>> > approach?
> >>> >
> >>> > --
> >>> > Devin G. Bost
> >>> >
> >>> > On Thu, Apr 1, 2021, 6:03 PM Jerry Peng <jerry.boyang.p...@gmail.com
> >
> >>> wrote:
> >>> >
> >>> > > Devin,
> >>> > >
> >>> > > Interceptors are located on the server side (broker/worker).
> Cluster
> >>> > > admins would create the plugins based on the interfaces I described
> >>> and
> >>> > > install them on the server side. Regular function developers will
> not
> >>> > > implement or interact directly with the interceptor.
> >>> > >
> >>> > > > Would it be better to just ignore WorkerConfig defaults during
> >>> function
> >>> > > update?
> >>> > >
> >>> > > Yes.
> >>> > >
> >>> > > If the cluster admin changes the function config defaults and wants
> >>> > > existing functions to utilize those configs that have changed, the
> >>> admin
> >>> > > can just update those configs of the functions through the regular
> >>> update
> >>> > > mechanism for functions we have today.
> >>> > >
> >>> > >
> >>> > >
> >>> > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <devin.b...@gmail.com>
> >>> wrote:
> >>> > >
> >>> > > > Would it be better to just ignore WorkerConfig defaults during
> >>> function
> >>> > > > update? We could still update the function producer behavior
> >>> through the
> >>> > > > producerConfig passed as a REST parameter, but we'd avoid the
> edge
> >>> case I
> >>> > > > mentioned.
> >>> > > >
> >>> > > > Devin G. Bost
> >>> > > >
> >>> > > >
> >>> > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > > I just remembered an edge case that motivated me to create that
> >>> > > > additional
> >>> > > > > flow.
> >>> > > > > In my PR, if any producerConfig values are null at registration
> >>> or
> >>> > > > update,
> >>> > > > > we get those values from WorkerConfig instead.
> >>> > > > > However, when a user runs an update (on a function) for the
> >>> first time
> >>> > > > > after upgrading to the version of Pulsar with this feature, if
> >>> someone
> >>> > > > has
> >>> > > > > modified the function_worker.yml file (perhaps without their
> >>> > > knowledge),
> >>> > > > > those defaults from function_worker.yml will get picked up
> >>> because the
> >>> > > > > existing producerConfig will have null set for those values
> >>> (because
> >>> > > the
> >>> > > > > function was registered prior to the existence of this
> feature.)
> >>> So,
> >>> > > > > the user could be trying to update something unrelated, not
> >>> realizing
> >>> > > > that
> >>> > > > > their function is going to have its producerConfig updated by
> the
> >>> > > cluster
> >>> > > > > defaults. Most users won't be affected by this, but it could be
> >>> tricky
> >>> > > to
> >>> > > > > diagnose for the few who would get impacted by it. (Is this
> >>> description
> >>> > > > any
> >>> > > > > clearer? If not, it might be clearer if I diagram it.)
> >>> > > > >
> >>> > > > > Devin G. Bost
> >>> > > > >
> >>> > > > >
> >>> > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng <
> >>> jerry.boyang.p...@gmail.com
> >>> > > >
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > Hi Devin,
> >>> > > > > >
> >>> > > > > > I understand the usefulness of giving cluster admins the
> >>> ability to
> >>> > > > > > specify
> >>> > > > > > some defaults for functions at a cluster level. I think the
> >>> right
> >>> > > time
> >>> > > > to
> >>> > > > > > apply those defaults is during function registration time. I
> >>> am not
> >>> > > > sure
> >>> > > > > > I
> >>> > > > > > understand the usefulness of what you are proposing we do
> >>> during
> >>> > > update
> >>> > > > > > time. If you would like to change a config of an already
> >>> running
> >>> > > > function
> >>> > > > > > why not just update it with config changes that are needed. I
> >>> am not
> >>> > > > sure
> >>> > > > > > you need the additional workflow you proposed for the update.
> >>> > > > > >
> >>> > > > > > Another idea to satisfy your use case is that instead of
> >>> leveraging a
> >>> > > > > > default config mechanism perhaps we should take a look at
> >>> implementing
> >>> > > > > > interceptors for the function/sources/sink API calls. We
> >>> already have
> >>> > > > > > interceptors implemented for many other pulsar
> functionalities.
> >>> > > Perhaps
> >>> > > > we
> >>> > > > > > should do it for Pulsar Functions APIs as well. With
> >>> interceptors you
> >>> > > > > > will
> >>> > > > > > be able to intercept all API operations to functions and
> >>> customize the
> >>> > > > > > requests however you would like.
> >>> > > > > >
> >>> > > > > > We can have a interface like:
> >>> > > > > >
> >>> > > > > > interface FunctionInterceptors {
> >>> > > > > >
> >>> > > > > > void regsterFunctionInterceptor (String tenant, String
> >>> namespace,
> >>> > > String
> >>> > > > > > functionName, FunctionConfig functionConfig...);
> >>> > > > > >
> >>> > > > > > ...
> >>> > > > > >
> >>> > > > > > }
> >>> > > > > >
> >>> > > > > > for developers to implement. During the beginning of every
> >>> Function
> >>> > > API
> >>> > > > > > call the corresponding interceptor method gets called. For
> >>> example,
> >>> > > the
> >>> > > > > > "regsterFunctionInterceptor()" method I provided above will
> be
> >>> called
> >>> > > > > > towards the beginning of the registerFunction and allow you
> to
> >>> > > customize
> >>> > > > > > the function config however you want.
> >>> > > > > >
> >>> > > > > > Best,
> >>> > > > > >
> >>> > > > > > Jerry
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <
> >>> devin.b...@gmail.com>
> >>> > > wrote:
> >>> > > > > >
> >>> > > > > > > *Cluster-Wide and Function-Specific Producer Defaults*
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > * - Status: Proposal- Author: Devin Bost (with guidance
> from
> >>> Jerry
> >>> > > > > > Peng)-
> >>> > > > > > > Pull Request: https://github.com/apache/pulsar/pull/9987
> >>> > > > > > > <https://github.com/apache/pulsar/pull/9987>- Mailing List
> >>> > > > discussion:
> >>> > > > > > -
> >>> > > > > > > Release: 2.8.0*
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > *Motivation*
> >>> > > > > > >
> >>> > > > > > > Pulsar currently provides no way to allow users or
> operators
> >>> to
> >>> > > change
> >>> > > > > > the
> >>> > > > > > > producer behavior of Pulsar functions. For organizations
> >>> that are
> >>> > > > making
> >>> > > > > > > heavy use of functions, the ability to tune messaging
> >>> behavior of
> >>> > > > > > functions
> >>> > > > > > > is greatly desired. Moreover, current function messaging
> >>> defaults
> >>> > > > > > appear to
> >>> > > > > > > be causing issues for certain workloads. (For example, some
> >>> bugs
> >>> > > > appear
> >>> > > > > > to
> >>> > > > > > > be related to having batching enabled. See #6054.) Enabling
> >>> > > operators
> >>> > > > to
> >>> > > > > > > modify these settings will help Pulsar cluster admins
> >>> workaround
> >>> > > > issues
> >>> > > > > > to
> >>> > > > > > > enable them to upgrade legacy versions of Pulsar to more
> >>> recent
> >>> > > > > > versions.
> >>> > > > > > > Moreover, enabling greater tuning of these settings can
> >>> provide more
> >>> > > > > > > opportunity for performance optimization in production
> >>> environments.
> >>> > > > We
> >>> > > > > > > need the ability to modify these function producer
> settings:
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > - batchingEnabled
> >>> > > > > > > - chunkingEnabled
> >>> > > > > > > - blockIfQueueFull
> >>> > > > > > > - compressionType
> >>> > > > > > > - hashingScheme
> >>> > > > > > > - messageRoutingMode
> >>> > > > > > > - batchingMaxPublishDelay
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > *Implementation Strategy*
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > Ideally, we need a way to set default message producer
> >>> behavior (for
> >>> > > > > > > functions) on a cluster-wide level. However, operators may
> >>> want
> >>> > > > specific
> >>> > > > > > > settings to be used for certain functions. (We can't assume
> >>> that
> >>> > > > > > everyone
> >>> > > > > > > will want all functions in a cluster to have the same
> >>> producer
> >>> > > > > > settings.)
> >>> > > > > > > When an operator changes the cluster-wide producer
> defaults,
> >>> they
> >>> > > > need a
> >>> > > > > > > way to be able to roll out the changes to functions without
> >>> > > > > > significantly
> >>> > > > > > > disrupting messaging flows in production environments. They
> >>> also
> >>> > > need
> >>> > > > a
> >>> > > > > > > simple way to rollback changes in case a change to function
> >>> producer
> >>> > > > > > > settings results in undesired behavior. However, not all
> >>> users will
> >>> > > > want
> >>> > > > > > > function updates to replace existing function producer
> >>> settings,
> >>> > > > > > especially
> >>> > > > > > > for functions that have been given custom producer settings
> >>> (at the
> >>> > > > > > > per-function level.) Due to this difference in use cases,
> >>> there
> >>> > > needs
> >>> > > > > > to be
> >>> > > > > > > a way to allow an operator to specify what update behavior
> >>> they
> >>> > > want.
> >>> > > > > > For a
> >>> > > > > > > given update, the user should be able to specify if:
> >>> > > > > > >
> >>> > > > > > > 1. Existing function producer settings will be replaced by
> >>> > > > > > cluster-wide
> >>> > > > > > > defaults (unless overridden by producer settings provided
> in
> >>> the
> >>> > > > > > > request),
> >>> > > > > > > or
> >>> > > > > > > 2. Existing function producer settings will *not* be
> >>> replaced by
> >>> > > > > > > cluster-wide defaults (unless overridden by producer
> settings
> >>> > > > > > provided
> >>> > > > > > > in
> >>> > > > > > > the request).
> >>> > > > > > >
> >>> > > > > > > So, the two orders of precedence are:
> >>> > > > > > >
> >>> > > > > > > 1. newProducerConfig > cluster-wide defaults >
> >>> > > > existingProducerConfig
> >>> > > > > > > 2. newProducerConfig > existingProducerConfig >
> cluster-wide
> >>> > > > defaults
> >>> > > > > > >
> >>> > > > > > > We can add a boolean property to UpdateOptions to allow the
> >>> > > precedence
> >>> > > > > > to
> >>> > > > > > > be specified. Since we don't want to introduce unexpected
> >>> side
> >>> > > effects
> >>> > > > > > > during an update, the default precedence should prefer the
> >>> > > > > > > existingProducerConfig over cluster-wide defaults.
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > To ensure that the desired producer settings (after
> >>> calculating the
> >>> > > > > > > settings from defaults + overrides) are available when
> >>> creating the
> >>> > > > > > > producers (on any cluster), we need to add these producer
> >>> settings
> >>> > > to
> >>> > > > > > the
> >>> > > > > > > function protobuf definition. Also, to ensure that settings
> >>> can be
> >>> > > > > > properly
> >>> > > > > > > validated at the time they are submitted, we need
> validation
> >>> in two
> >>> > > > > > places:
> >>> > > > > > >
> >>> > > > > > > 1. When cluster-wide configs are loaded upon broker startup
> >>> > > > > > > 2. When a function is registered or updated.
> >>> > > > > > >
> >>> > > > > > > As it would be impractical to check every registered
> function
> >>> > > > definition
> >>> > > > > > > during broker startup, most of the validation needs to
> occur
> >>> during
> >>> > > > > > > function registration or update. However, during broker
> >>> startup, we
> >>> > > > can
> >>> > > > > > at
> >>> > > > > > > least validate that the configurations exist and throw an
> >>> exception
> >>> > > if
> >>> > > > > > they
> >>> > > > > > > are invalid.
> >>> > > > > > >
> >>> > > > > > > As any configuration exceptions thrown when the producer is
> >>> created
> >>> > > > > > would
> >>> > > > > > > bubble up to the client, we need to ensure that the final
> >>> > > > configurations
> >>> > > > > > > are valid when a function is registered or updated to
> ensure
> >>> there
> >>> > > are
> >>> > > > > > > exceptions when the producer is created. This requires
> >>> computing the
> >>> > > > > > > producer settings (according to the desired precedence)
> >>> during
> >>> > > > function
> >>> > > > > > > registration or update.
> >>> > > > > > >
> >>> > > > > > > When a function is registered, existing producer settings
> do
> >>> not
> >>> > > exist
> >>> > > > > > for
> >>> > > > > > > it, so we simply need to get cluster-wide defaults and
> >>> override them
> >>> > > > > > with
> >>> > > > > > > any specific producer settings provided in the
> >>> FunctionConfig's
> >>> > > > > > > ProducerConfig (submitted as a REST parameter.) After the
> >>> initial
> >>> > > > > > > registration, those computed settings will then be
> persisted
> >>> in
> >>> > > > > > bookkeeper
> >>> > > > > > > with the function definition.
> >>> > > > > > >
> >>> > > > > > > When a function is updated, we also need to address the
> >>> existing
> >>> > > > > > > producerConfig, and for this, we need to check the desired
> >>> override
> >>> > > > > > > precedence (as explained above) when computing the final
> >>> producer
> >>> > > > > > settings
> >>> > > > > > > that will be persisted with the function.
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > *Modifications*
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > As WorkerConfig is readily available when functions are
> >>> registered
> >>> > > and
> >>> > > > > > > updated, we can put new settings into function_worker.yml
> >>> and can
> >>> > > load
> >>> > > > > > and
> >>> > > > > > > validate them via WorkerConfig.load(..). To prevent
> >>> introducing
> >>> > > > breaking
> >>> > > > > > > changes, parameters will have defaults assigned in
> >>> WorkerConfig that
> >>> > > > > > > conform to current expected behavior, like this:
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @Data
> >>> > > > > > >
> >>> > > > > > > @Accessors(chain = true)
> >>> > > > > > >
> >>> > > > > > > public class FunctionDefaultsConfig implements
> Serializable {
> >>> > > > > > >
> >>> > > > > > > @FieldContext(
> >>> > > > > > >
> >>> > > > > > > doc = "Disables default message batching between
> >>> > > > functions"
> >>> > > > > > >
> >>> > > > > > > )
> >>> > > > > > >
> >>> > > > > > > protected boolean batchingDisabled = false;
> >>> > > > > > >
> >>> > > > > > > @FieldContext(
> >>> > > > > > >
> >>> > > > > > > doc = "Enables default message chunking between
> >>> > > functions"
> >>> > > > > > >
> >>> > > > > > > )
> >>> > > > > > >
> >>> > > > > > > protected boolean chunkingEnabled = false;
> >>> > > > > > >
> >>> > > > > > > @FieldContext(
> >>> > > > > > >
> >>> > > > > > > doc = "Disables default behavior to block when message
> >>> > > > > > queue is
> >>> > > > > > > full"
> >>> > > > > > >
> >>> > > > > > > )
> >>> > > > > > >
> >>> > > > > > > protected boolean blockIfQueueFullDisabled = false;
> >>> > > > > > >
> >>> > > > > > > @FieldContext(
> >>> > > > > > >
> >>> > > > > > > doc = "Default compression type"
> >>> > > > > > >
> >>> > > > > > > )
> >>> > > > > > >
> >>> > > > > > > protected String compressionType = "LZ4";
> >>> > > > > > >
> >>> > > > > > > @FieldContext(
> >>> > > > > > >
> >>> > > > > > > doc = "Default hashing scheme"
> >>> > > > > > >
> >>> > > > > > > )
> >>> > > > > > >
> >>> > > > > > > protected String hashingScheme = "Murmur3_32Hash";
> >>> > > > > > >
> >>> > > > > > > @FieldContext(
> >>> > > > > > >
> >>> > > > > > > doc = "Default hashing scheme"
> >>> > > > > > >
> >>> > > > > > > )
> >>> > > > > > >
> >>> > > > > > > protected String messageRoutingMode = "CustomPartition";
> >>> > > > > > >
> >>> > > > > > > @FieldContext(
> >>> > > > > > >
> >>> > > > > > > doc = "Default max publish delay (in milliseconds) when
> >>> > > > > > message
> >>> > > > > > > batching is enabled"
> >>> > > > > > >
> >>> > > > > > > )
> >>> > > > > > >
> >>> > > > > > > protected long batchingMaxPublishDelay = 10L;
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > public ClusterFunctionProducerDefaults
> >>> buildProducerDefaults()
> >>> > > > > > > throws InvalidWorkerConfigDefaultException
> >>> > > > > > > {
> >>> > > > > > >
> >>> > > > > > > return new
> >>> > > > > > > ClusterFunctionProducerDefaults(this.isBatchingDisabled()
> >>> > > > > > > ,
> >>> > > > > > >
> >>> > > > > > > this.isChunkingEnabled(),
> >>> > > > > > > this.isBlockIfQueueFullDisabled(),
> >>> > > > > > > this.getCompressionType(),
> >>> > > > > > >
> >>> > > > > > > this.getHashingScheme(),
> >>> > > this.getMessageRoutingMode(),
> >>> > > > > > this
> >>> > > > > > > .getBatchingMaxPublishDelay());
> >>> > > > > > >
> >>> > > > > > > }
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > public FunctionDefaultsConfig() {
> >>> > > > > > >
> >>> > > > > > > }
> >>> > > > > > >
> >>> > > > > > > }
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > The additional section in function_worker.yml will look
> like
> >>> this:
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > functionDefaults:
> >>> > > > > > >
> >>> > > > > > > batchingDisabled: true
> >>> > > > > > >
> >>> > > > > > > chunkingEnabled: false
> >>> > > > > > >
> >>> > > > > > > blockIfQueueFullDisabled: false
> >>> > > > > > >
> >>> > > > > > > batchingMaxPublishDelay: 12
> >>> > > > > > >
> >>> > > > > > > compressionType: ZLIB
> >>> > > > > > >
> >>> > > > > > > hashingScheme: JavaStringHash
> >>> > > > > > >
> >>> > > > > > > messageRoutingMode: RoundRobinPartition
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > During function update, we can use a new boolean option,
> >>> > > > > > > ignoreExistingFunctionDefaults, to toggle precedence:
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @Data
> >>> > > > > > >
> >>> > > > > > > @NoArgsConstructor
> >>> > > > > > >
> >>> > > > > > > @ApiModel(value = "UpdateOptions", description = "Options
> >>> while
> >>> > > > updating
> >>> > > > > > > function defaults or the sink")
> >>> > > > > > >
> >>> > > > > > > public class UpdateOptions {
> >>> > > > > > >
> >>> > > > > > > @ApiModelProperty(
> >>> > > > > > >
> >>> > > > > > > value = "Whether or not to update the auth data",
> >>> > > > > > >
> >>> > > > > > > name = "update-auth-data")
> >>> > > > > > >
> >>> > > > > > > private boolean updateAuthData = false;
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @ApiModelProperty(
> >>> > > > > > >
> >>> > > > > > > value="Whether or not to ignore any existing function
> >>> > > > > > > defaults",
> >>> > > > > > >
> >>> > > > > > > name ="ignore-existing-function-defaults")
> >>> > > > > > >
> >>> > > > > > > private boolean ignoreExistingFunctionDefaults = false;
> >>> > > > > > >
> >>> > > > > > > }
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > To ensure that function producer settings don't get
> modified
> >>> > > > > > unexpectedly
> >>> > > > > > > by an update, ignoreExistingFunctionDefaults = false by
> >>> default.
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > The updated ProducerConfig will contain the new properties
> >>> and a
> >>> > > > method
> >>> > > > > > to
> >>> > > > > > > merge an incoming ProducerConfig with an existing
> >>> ProducerConfig.
> >>> > > (The
> >>> > > > > > > merged ProducerConfig will use the incoming ProducerConfig
> >>> > > properties
> >>> > > > > > when
> >>> > > > > > > they exist and will use existing ProducerConfig properties
> >>> > > otherwise.)
> >>> > > > > > To
> >>> > > > > > > ensure we don't need to force operators to provide those
> >>> > > > ProducerConfig
> >>> > > > > > > properties with every create or update, they must be
> >>> nullable, not
> >>> > > > > > > primitive (e.g. Boolean instead of boolean.)
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > To support the additional properties, FunctionConfigUtils
> >>> (and any
> >>> > > > > > similar
> >>> > > > > > > classes) will need to be modified to correctly translate
> >>> between the
> >>> > > > > > > ProducerConfig and the protobuf ProducerSpec.
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > The updated ProducerSpec in Function.proto will look like
> >>> this:
> >>> > > > > > >
> >>> > > > > > > message ProducerSpec {
> >>> > > > > > >
> >>> > > > > > > int32 maxPendingMessages = 1;
> >>> > > > > > >
> >>> > > > > > > int32 maxPendingMessagesAcrossPartitions = 2;
> >>> > > > > > >
> >>> > > > > > > bool useThreadLocalProducers = 3;
> >>> > > > > > >
> >>> > > > > > > CryptoSpec cryptoSpec = 4;
> >>> > > > > > >
> >>> > > > > > > string batchBuilder = 5;
> >>> > > > > > >
> >>> > > > > > > bool batchingDisabled = 6;
> >>> > > > > > >
> >>> > > > > > > bool chunkingEnabled = 7;
> >>> > > > > > >
> >>> > > > > > > bool blockIfQueueFullDisabled = 8;
> >>> > > > > > >
> >>> > > > > > > CompressionType compressionType = 9;
> >>> > > > > > >
> >>> > > > > > > HashingScheme hashingScheme = 10;
> >>> > > > > > >
> >>> > > > > > > MessageRoutingMode messageRoutingMode = 11;
> >>> > > > > > >
> >>> > > > > > > int64 batchingMaxPublishDelay = 12;
> >>> > > > > > >
> >>> > > > > > > }
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > To support these new parameters during local run and for
> >>> > > convenience,
> >>> > > > we
> >>> > > > > > > should be able to specify them in the Admin CLI and in
> local
> >>> run
> >>> > > > modes.
> >>> > > > > > >
> >>> > > > > > > For local run, as function_worker.yml is not used, these
> >>> parameters
> >>> > > > will
> >>> > > > > > > have defaults and be primitive values:
> >>> > > > > > >
> >>> > > > > > > @Parameter(names = "--clusterFunctionBatchingDisabled",
> >>> description
> >>> > > =
> >>> > > > > > > "Disable
> >>> > > > > > > the default message batching behavior for functions",
> hidden
> >>> = true)
> >>> > > > > > >
> >>> > > > > > > public boolean clusterFunctionBatchingDisabled = false;
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @Parameter(names = "--clusterFunctionChunkingEnabled",
> >>> description =
> >>> > > > > > "The
> >>> > > > > > > default message chunking behavior for functions", hidden =
> >>> true)
> >>> > > > > > >
> >>> > > > > > > public boolean clusterFunctionChunkingEnabled = false;
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @Parameter(names =
> >>> "--clusterFunctionBlockIfQueueFullDisabled",
> >>> > > > > > description
> >>> > > > > > > = "Disable the default blocking behavior for functions when
> >>> queue is
> >>> > > > > > > full", hidden
> >>> > > > > > > = true)
> >>> > > > > > >
> >>> > > > > > > public boolean clusterFunctionBlockIfQueueFullDisabled =
> >>> false;
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @Parameter(names =
> "--clusterFunctionCompressionTypeDefault",
> >>> > > > > > > description = "The
> >>> > > > > > > default Compression Type for functions", hidden = true)
> >>> > > > > > >
> >>> > > > > > > public String clusterFunctionCompressionTypeDefault =
> "LZ4";
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @Parameter(names = "--clusterFunctionHashingSchemeDefault",
> >>> > > > description
> >>> > > > > > =
> >>> > > > > > > "The
> >>> > > > > > > default Hashing Scheme for functions", hidden = true)
> >>> > > > > > >
> >>> > > > > > > public String clusterFunctionHashingSchemeDefault =
> >>> > > "Murmur3_32Hash";
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @Parameter(names =
> >>> "--clusterFunctionMessageRoutingModeDefault",
> >>> > > > > > > description
> >>> > > > > > > = "The default Message Routing Mode for functions", hidden
> =
> >>> true)
> >>> > > > > > >
> >>> > > > > > > public String clusterFunctionMessageRoutingModeDefault =
> >>> > > > > > "CustomPartition";
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > @Parameter(names =
> >>> > > "--clusterFunctionBatchingMaxPublishDelayDefault",
> >>> > > > > > > description
> >>> > > > > > > = "The default max publish delay (in milliseconds) for
> >>> functions
> >>> > > when
> >>> > > > > > > message batching is enabled", hidden = true)
> >>> > > > > > >
> >>> > > > > > > public long clusterFunctionBatchingMaxPublishDelayDefault =
> >>> 10L;
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > For Admin CLI, parameters can be provided in a similar way,
> >>> but they
> >>> > > > > > will
> >>> > > > > > > not have defaults (i.e. Boolean instead of boolean) since
> >>> parameters
> >>> > > > > > should
> >>> > > > > > > be null to ensure non-provided configs are obtained from
> >>> > > WorkerConfig.
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > To handle the logic of selecting the property from an
> >>> incoming
> >>> > > > > > > producerConfig vs the producerDefaults object (from
> >>> WorkerConfig),
> >>> > > we
> >>> > > > > > can
> >>> > > > > > > use a FunctionDefaultsMediator to handle this. That
> >>> interface looks
> >>> > > > like
> >>> > > > > > > this and gives us flexibility to handle mediation for
> >>> specific
> >>> > > > > > properties
> >>> > > > > > > without needing to convert the entire object.
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > public interface FunctionDefaultsMediator {
> >>> > > > > > >
> >>> > > > > > > boolean isBatchingDisabled();
> >>> > > > > > >
> >>> > > > > > > boolean isChunkingEnabled();
> >>> > > > > > >
> >>> > > > > > > boolean isBlockIfQueueFullDisabled();
> >>> > > > > > >
> >>> > > > > > > CompressionType getCompressionType();
> >>> > > > > > >
> >>> > > > > > > Function.CompressionType getCompressionTypeProto();
> >>> > > > > > >
> >>> > > > > > > HashingScheme getHashingScheme();
> >>> > > > > > >
> >>> > > > > > > Function.HashingScheme getHashingSchemeProto();
> >>> > > > > > >
> >>> > > > > > > MessageRoutingMode getMessageRoutingMode();
> >>> > > > > > >
> >>> > > > > > > Function.MessageRoutingMode getMessageRoutingModeProto();
> >>> > > > > > >
> >>> > > > > > > Long getBatchingMaxPublishDelay();
> >>> > > > > > >
> >>> > > > > > > }
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > (Alternatively, we could remove the Proto methods from the
> >>> mediator
> >>> > > > and
> >>> > > > > > > just use FunctionConfigUtils.convert(..) to translate
> >>> between the
> >>> > > > > > protobuf
> >>> > > > > > > ProducerSpec and the Java ProducerConfig.)
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > *Testing*
> >>> > > > > > >
> >>> > > > > > > In terms of testing, many of this functionality will be
> >>> covered by
> >>> > > > > > existing
> >>> > > > > > > tests in the pulsar-functions module. However, we need to
> >>> test each
> >>> > > of
> >>> > > > > > the
> >>> > > > > > > new classes, as well as these behaviors:
> >>> > > > > > >
> >>> > > > > > > - functionRegistration (with different provided
> >>> ProducerConfig
> >>> > > > > > > properties)
> >>> > > > > > > - functionUpdate (for ignoreExistingFunctionDefaults=true
> and
> >>> > > > > > > ignoreExistingFunctionDefaults=false)
> >>> > > > > > > - loading function_worker.yml into WorkerConfig
> >>> > > > > > > - throwing exceptions correctly when invalid parameters are
> >>> > > > provided
> >>> > > > > > > - converting between ProducerSpec and ProducerConfig
> >>> > > > > > > - correctly handling precedence when computing the final
> >>> > > > > > ProducerConfig
> >>> > > > > > > values
> >>> > > > > > > - creating producers
> >>> > > > > > > - ensuring functionality is consistent across runtimes
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > I'd like feedback on this proposal.
> >>> > > > > > >
> >>> > > > > > > Devin G. Bost
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>>
> >>
>

Reply via email to