If we use the interceptor approach, does that mean a cluster-admin would
need to write custom code in order to override the function producer
settings? I worry about adding additional burden to cluster administration.

--
Devin G. Bost

On Sat, Apr 3, 2021, 8:26 AM Devin Bost <devin.b...@gmail.com> wrote:

> Hi Rui,
>
> Thanks for the feedback. My understanding about the interceptor is that
> it's not something that would be touched by users but would only be
> modified by cluster admins. So, I think we'd still need to include the
> parameters the parameters on the ProducerConfig.
> I already have the other approach coded, so changing the design will
> require some rework.
> I'll look at that example you mentioned.
>
> --
> Devin G. Bost
>
> On Fri, Apr 2, 2021, 10:18 PM Rui Fu <f...@rui.sh> wrote:
>
>> Hi Devin,
>>
>> Great proposal for giving function such useful feature, and thanks for
>> your detailed writing. After going through the history of this discussion,
>> I would like to have +1 with Jerry’s suggestion. With the interceptor, the
>> admins and users may have the maximum flexibility to customize their
>> producer configs. It may also reduce the parameters we put into the configs
>> and pulsar-admin. Also, we have some interceptors with Pulsar Functions
>> already, like the `RuntimeCustomizer` interface, so it is not a new tech
>> that users hard to adopt.
>>
>> Besides, I would like to suggest that to cover these changes with Python
>> & Go runtime as well if we will have some new fields to `Function.proto`.
>>
>> Best,
>>
>> Rui
>> 在 2021年4月2日 +0800 AM10:13,Devin Bost <devin.b...@gmail.com>,写道:
>> > What would be some of the additional benefits of using the interceptor
>> > approach?
>> >
>> > --
>> > Devin G. Bost
>> >
>> > On Thu, Apr 1, 2021, 6:03 PM Jerry Peng <jerry.boyang.p...@gmail.com>
>> wrote:
>> >
>> > > Devin,
>> > >
>> > > Interceptors are located on the server side (broker/worker). Cluster
>> > > admins would create the plugins based on the interfaces I described
>> and
>> > > install them on the server side. Regular function developers will not
>> > > implement or interact directly with the interceptor.
>> > >
>> > > > Would it be better to just ignore WorkerConfig defaults during
>> function
>> > > update?
>> > >
>> > > Yes.
>> > >
>> > > If the cluster admin changes the function config defaults and wants
>> > > existing functions to utilize those configs that have changed, the
>> admin
>> > > can just update those configs of the functions through the regular
>> update
>> > > mechanism for functions we have today.
>> > >
>> > >
>> > >
>> > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <devin.b...@gmail.com>
>> wrote:
>> > >
>> > > > Would it be better to just ignore WorkerConfig defaults during
>> function
>> > > > update? We could still update the function producer behavior
>> through the
>> > > > producerConfig passed as a REST parameter, but we'd avoid the edge
>> case I
>> > > > mentioned.
>> > > >
>> > > > Devin G. Bost
>> > > >
>> > > >
>> > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com>
>> wrote:
>> > > >
>> > > > > I just remembered an edge case that motivated me to create that
>> > > > additional
>> > > > > flow.
>> > > > > In my PR, if any producerConfig values are null at registration or
>> > > > update,
>> > > > > we get those values from WorkerConfig instead.
>> > > > > However, when a user runs an update (on a function) for the first
>> time
>> > > > > after upgrading to the version of Pulsar with this feature, if
>> someone
>> > > > has
>> > > > > modified the function_worker.yml file (perhaps without their
>> > > knowledge),
>> > > > > those defaults from function_worker.yml will get picked up
>> because the
>> > > > > existing producerConfig will have null set for those values
>> (because
>> > > the
>> > > > > function was registered prior to the existence of this feature.)
>> So,
>> > > > > the user could be trying to update something unrelated, not
>> realizing
>> > > > that
>> > > > > their function is going to have its producerConfig updated by the
>> > > cluster
>> > > > > defaults. Most users won't be affected by this, but it could be
>> tricky
>> > > to
>> > > > > diagnose for the few who would get impacted by it. (Is this
>> description
>> > > > any
>> > > > > clearer? If not, it might be clearer if I diagram it.)
>> > > > >
>> > > > > Devin G. Bost
>> > > > >
>> > > > >
>> > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng <
>> jerry.boyang.p...@gmail.com
>> > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Devin,
>> > > > > >
>> > > > > > I understand the usefulness of giving cluster admins the
>> ability to
>> > > > > > specify
>> > > > > > some defaults for functions at a cluster level. I think the
>> right
>> > > time
>> > > > to
>> > > > > > apply those defaults is during function registration time. I am
>> not
>> > > > sure
>> > > > > > I
>> > > > > > understand the usefulness of what you are proposing we do during
>> > > update
>> > > > > > time. If you would like to change a config of an already running
>> > > > function
>> > > > > > why not just update it with config changes that are needed. I
>> am not
>> > > > sure
>> > > > > > you need the additional workflow you proposed for the update.
>> > > > > >
>> > > > > > Another idea to satisfy your use case is that instead of
>> leveraging a
>> > > > > > default config mechanism perhaps we should take a look at
>> implementing
>> > > > > > interceptors for the function/sources/sink API calls. We
>> already have
>> > > > > > interceptors implemented for many other pulsar functionalities.
>> > > Perhaps
>> > > > we
>> > > > > > should do it for Pulsar Functions APIs as well. With
>> interceptors you
>> > > > > > will
>> > > > > > be able to intercept all API operations to functions and
>> customize the
>> > > > > > requests however you would like.
>> > > > > >
>> > > > > > We can have a interface like:
>> > > > > >
>> > > > > > interface FunctionInterceptors {
>> > > > > >
>> > > > > > void regsterFunctionInterceptor (String tenant, String
>> namespace,
>> > > String
>> > > > > > functionName, FunctionConfig functionConfig...);
>> > > > > >
>> > > > > > ...
>> > > > > >
>> > > > > > }
>> > > > > >
>> > > > > > for developers to implement. During the beginning of every
>> Function
>> > > API
>> > > > > > call the corresponding interceptor method gets called. For
>> example,
>> > > the
>> > > > > > "regsterFunctionInterceptor()" method I provided above will be
>> called
>> > > > > > towards the beginning of the registerFunction and allow you to
>> > > customize
>> > > > > > the function config however you want.
>> > > > > >
>> > > > > > Best,
>> > > > > >
>> > > > > > Jerry
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <devin.b...@gmail.com
>> >
>> > > wrote:
>> > > > > >
>> > > > > > > *Cluster-Wide and Function-Specific Producer Defaults*
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > * - Status: Proposal- Author: Devin Bost (with guidance from
>> Jerry
>> > > > > > Peng)-
>> > > > > > > Pull Request: https://github.com/apache/pulsar/pull/9987
>> > > > > > > <https://github.com/apache/pulsar/pull/9987>- Mailing List
>> > > > discussion:
>> > > > > > -
>> > > > > > > Release: 2.8.0*
>> > > > > > >
>> > > > > > >
>> > > > > > > *Motivation*
>> > > > > > >
>> > > > > > > Pulsar currently provides no way to allow users or operators
>> to
>> > > change
>> > > > > > the
>> > > > > > > producer behavior of Pulsar functions. For organizations that
>> are
>> > > > making
>> > > > > > > heavy use of functions, the ability to tune messaging
>> behavior of
>> > > > > > functions
>> > > > > > > is greatly desired. Moreover, current function messaging
>> defaults
>> > > > > > appear to
>> > > > > > > be causing issues for certain workloads. (For example, some
>> bugs
>> > > > appear
>> > > > > > to
>> > > > > > > be related to having batching enabled. See #6054.) Enabling
>> > > operators
>> > > > to
>> > > > > > > modify these settings will help Pulsar cluster admins
>> workaround
>> > > > issues
>> > > > > > to
>> > > > > > > enable them to upgrade legacy versions of Pulsar to more
>> recent
>> > > > > > versions.
>> > > > > > > Moreover, enabling greater tuning of these settings can
>> provide more
>> > > > > > > opportunity for performance optimization in production
>> environments.
>> > > > We
>> > > > > > > need the ability to modify these function producer settings:
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > - batchingEnabled
>> > > > > > > - chunkingEnabled
>> > > > > > > - blockIfQueueFull
>> > > > > > > - compressionType
>> > > > > > > - hashingScheme
>> > > > > > > - messageRoutingMode
>> > > > > > > - batchingMaxPublishDelay
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > *Implementation Strategy*
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > Ideally, we need a way to set default message producer
>> behavior (for
>> > > > > > > functions) on a cluster-wide level. However, operators may
>> want
>> > > > specific
>> > > > > > > settings to be used for certain functions. (We can't assume
>> that
>> > > > > > everyone
>> > > > > > > will want all functions in a cluster to have the same producer
>> > > > > > settings.)
>> > > > > > > When an operator changes the cluster-wide producer defaults,
>> they
>> > > > need a
>> > > > > > > way to be able to roll out the changes to functions without
>> > > > > > significantly
>> > > > > > > disrupting messaging flows in production environments. They
>> also
>> > > need
>> > > > a
>> > > > > > > simple way to rollback changes in case a change to function
>> producer
>> > > > > > > settings results in undesired behavior. However, not all
>> users will
>> > > > want
>> > > > > > > function updates to replace existing function producer
>> settings,
>> > > > > > especially
>> > > > > > > for functions that have been given custom producer settings
>> (at the
>> > > > > > > per-function level.) Due to this difference in use cases,
>> there
>> > > needs
>> > > > > > to be
>> > > > > > > a way to allow an operator to specify what update behavior
>> they
>> > > want.
>> > > > > > For a
>> > > > > > > given update, the user should be able to specify if:
>> > > > > > >
>> > > > > > > 1. Existing function producer settings will be replaced by
>> > > > > > cluster-wide
>> > > > > > > defaults (unless overridden by producer settings provided in
>> the
>> > > > > > > request),
>> > > > > > > or
>> > > > > > > 2. Existing function producer settings will *not* be replaced
>> by
>> > > > > > > cluster-wide defaults (unless overridden by producer settings
>> > > > > > provided
>> > > > > > > in
>> > > > > > > the request).
>> > > > > > >
>> > > > > > > So, the two orders of precedence are:
>> > > > > > >
>> > > > > > > 1. newProducerConfig > cluster-wide defaults >
>> > > > existingProducerConfig
>> > > > > > > 2. newProducerConfig > existingProducerConfig > cluster-wide
>> > > > defaults
>> > > > > > >
>> > > > > > > We can add a boolean property to UpdateOptions to allow the
>> > > precedence
>> > > > > > to
>> > > > > > > be specified. Since we don't want to introduce unexpected side
>> > > effects
>> > > > > > > during an update, the default precedence should prefer the
>> > > > > > > existingProducerConfig over cluster-wide defaults.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > To ensure that the desired producer settings (after
>> calculating the
>> > > > > > > settings from defaults + overrides) are available when
>> creating the
>> > > > > > > producers (on any cluster), we need to add these producer
>> settings
>> > > to
>> > > > > > the
>> > > > > > > function protobuf definition. Also, to ensure that settings
>> can be
>> > > > > > properly
>> > > > > > > validated at the time they are submitted, we need validation
>> in two
>> > > > > > places:
>> > > > > > >
>> > > > > > > 1. When cluster-wide configs are loaded upon broker startup
>> > > > > > > 2. When a function is registered or updated.
>> > > > > > >
>> > > > > > > As it would be impractical to check every registered function
>> > > > definition
>> > > > > > > during broker startup, most of the validation needs to occur
>> during
>> > > > > > > function registration or update. However, during broker
>> startup, we
>> > > > can
>> > > > > > at
>> > > > > > > least validate that the configurations exist and throw an
>> exception
>> > > if
>> > > > > > they
>> > > > > > > are invalid.
>> > > > > > >
>> > > > > > > As any configuration exceptions thrown when the producer is
>> created
>> > > > > > would
>> > > > > > > bubble up to the client, we need to ensure that the final
>> > > > configurations
>> > > > > > > are valid when a function is registered or updated to ensure
>> there
>> > > are
>> > > > > > > exceptions when the producer is created. This requires
>> computing the
>> > > > > > > producer settings (according to the desired precedence) during
>> > > > function
>> > > > > > > registration or update.
>> > > > > > >
>> > > > > > > When a function is registered, existing producer settings do
>> not
>> > > exist
>> > > > > > for
>> > > > > > > it, so we simply need to get cluster-wide defaults and
>> override them
>> > > > > > with
>> > > > > > > any specific producer settings provided in the
>> FunctionConfig's
>> > > > > > > ProducerConfig (submitted as a REST parameter.) After the
>> initial
>> > > > > > > registration, those computed settings will then be persisted
>> in
>> > > > > > bookkeeper
>> > > > > > > with the function definition.
>> > > > > > >
>> > > > > > > When a function is updated, we also need to address the
>> existing
>> > > > > > > producerConfig, and for this, we need to check the desired
>> override
>> > > > > > > precedence (as explained above) when computing the final
>> producer
>> > > > > > settings
>> > > > > > > that will be persisted with the function.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > *Modifications*
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > As WorkerConfig is readily available when functions are
>> registered
>> > > and
>> > > > > > > updated, we can put new settings into function_worker.yml and
>> can
>> > > load
>> > > > > > and
>> > > > > > > validate them via WorkerConfig.load(..). To prevent
>> introducing
>> > > > breaking
>> > > > > > > changes, parameters will have defaults assigned in
>> WorkerConfig that
>> > > > > > > conform to current expected behavior, like this:
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Data
>> > > > > > >
>> > > > > > > @Accessors(chain = true)
>> > > > > > >
>> > > > > > > public class FunctionDefaultsConfig implements Serializable {
>> > > > > > >
>> > > > > > > @FieldContext(
>> > > > > > >
>> > > > > > > doc = "Disables default message batching between
>> > > > functions"
>> > > > > > >
>> > > > > > > )
>> > > > > > >
>> > > > > > > protected boolean batchingDisabled = false;
>> > > > > > >
>> > > > > > > @FieldContext(
>> > > > > > >
>> > > > > > > doc = "Enables default message chunking between
>> > > functions"
>> > > > > > >
>> > > > > > > )
>> > > > > > >
>> > > > > > > protected boolean chunkingEnabled = false;
>> > > > > > >
>> > > > > > > @FieldContext(
>> > > > > > >
>> > > > > > > doc = "Disables default behavior to block when message
>> > > > > > queue is
>> > > > > > > full"
>> > > > > > >
>> > > > > > > )
>> > > > > > >
>> > > > > > > protected boolean blockIfQueueFullDisabled = false;
>> > > > > > >
>> > > > > > > @FieldContext(
>> > > > > > >
>> > > > > > > doc = "Default compression type"
>> > > > > > >
>> > > > > > > )
>> > > > > > >
>> > > > > > > protected String compressionType = "LZ4";
>> > > > > > >
>> > > > > > > @FieldContext(
>> > > > > > >
>> > > > > > > doc = "Default hashing scheme"
>> > > > > > >
>> > > > > > > )
>> > > > > > >
>> > > > > > > protected String hashingScheme = "Murmur3_32Hash";
>> > > > > > >
>> > > > > > > @FieldContext(
>> > > > > > >
>> > > > > > > doc = "Default hashing scheme"
>> > > > > > >
>> > > > > > > )
>> > > > > > >
>> > > > > > > protected String messageRoutingMode = "CustomPartition";
>> > > > > > >
>> > > > > > > @FieldContext(
>> > > > > > >
>> > > > > > > doc = "Default max publish delay (in milliseconds) when
>> > > > > > message
>> > > > > > > batching is enabled"
>> > > > > > >
>> > > > > > > )
>> > > > > > >
>> > > > > > > protected long batchingMaxPublishDelay = 10L;
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > public ClusterFunctionProducerDefaults buildProducerDefaults()
>> > > > > > > throws InvalidWorkerConfigDefaultException
>> > > > > > > {
>> > > > > > >
>> > > > > > > return new
>> > > > > > > ClusterFunctionProducerDefaults(this.isBatchingDisabled()
>> > > > > > > ,
>> > > > > > >
>> > > > > > > this.isChunkingEnabled(),
>> > > > > > > this.isBlockIfQueueFullDisabled(),
>> > > > > > > this.getCompressionType(),
>> > > > > > >
>> > > > > > > this.getHashingScheme(),
>> > > this.getMessageRoutingMode(),
>> > > > > > this
>> > > > > > > .getBatchingMaxPublishDelay());
>> > > > > > >
>> > > > > > > }
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > public FunctionDefaultsConfig() {
>> > > > > > >
>> > > > > > > }
>> > > > > > >
>> > > > > > > }
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > The additional section in function_worker.yml will look like
>> this:
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > functionDefaults:
>> > > > > > >
>> > > > > > > batchingDisabled: true
>> > > > > > >
>> > > > > > > chunkingEnabled: false
>> > > > > > >
>> > > > > > > blockIfQueueFullDisabled: false
>> > > > > > >
>> > > > > > > batchingMaxPublishDelay: 12
>> > > > > > >
>> > > > > > > compressionType: ZLIB
>> > > > > > >
>> > > > > > > hashingScheme: JavaStringHash
>> > > > > > >
>> > > > > > > messageRoutingMode: RoundRobinPartition
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > During function update, we can use a new boolean option,
>> > > > > > > ignoreExistingFunctionDefaults, to toggle precedence:
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Data
>> > > > > > >
>> > > > > > > @NoArgsConstructor
>> > > > > > >
>> > > > > > > @ApiModel(value = "UpdateOptions", description = "Options
>> while
>> > > > updating
>> > > > > > > function defaults or the sink")
>> > > > > > >
>> > > > > > > public class UpdateOptions {
>> > > > > > >
>> > > > > > > @ApiModelProperty(
>> > > > > > >
>> > > > > > > value = "Whether or not to update the auth data",
>> > > > > > >
>> > > > > > > name = "update-auth-data")
>> > > > > > >
>> > > > > > > private boolean updateAuthData = false;
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @ApiModelProperty(
>> > > > > > >
>> > > > > > > value="Whether or not to ignore any existing function
>> > > > > > > defaults",
>> > > > > > >
>> > > > > > > name ="ignore-existing-function-defaults")
>> > > > > > >
>> > > > > > > private boolean ignoreExistingFunctionDefaults = false;
>> > > > > > >
>> > > > > > > }
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > To ensure that function producer settings don't get modified
>> > > > > > unexpectedly
>> > > > > > > by an update, ignoreExistingFunctionDefaults = false by
>> default.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > The updated ProducerConfig will contain the new properties
>> and a
>> > > > method
>> > > > > > to
>> > > > > > > merge an incoming ProducerConfig with an existing
>> ProducerConfig.
>> > > (The
>> > > > > > > merged ProducerConfig will use the incoming ProducerConfig
>> > > properties
>> > > > > > when
>> > > > > > > they exist and will use existing ProducerConfig properties
>> > > otherwise.)
>> > > > > > To
>> > > > > > > ensure we don't need to force operators to provide those
>> > > > ProducerConfig
>> > > > > > > properties with every create or update, they must be
>> nullable, not
>> > > > > > > primitive (e.g. Boolean instead of boolean.)
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > To support the additional properties, FunctionConfigUtils
>> (and any
>> > > > > > similar
>> > > > > > > classes) will need to be modified to correctly translate
>> between the
>> > > > > > > ProducerConfig and the protobuf ProducerSpec.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > The updated ProducerSpec in Function.proto will look like
>> this:
>> > > > > > >
>> > > > > > > message ProducerSpec {
>> > > > > > >
>> > > > > > > int32 maxPendingMessages = 1;
>> > > > > > >
>> > > > > > > int32 maxPendingMessagesAcrossPartitions = 2;
>> > > > > > >
>> > > > > > > bool useThreadLocalProducers = 3;
>> > > > > > >
>> > > > > > > CryptoSpec cryptoSpec = 4;
>> > > > > > >
>> > > > > > > string batchBuilder = 5;
>> > > > > > >
>> > > > > > > bool batchingDisabled = 6;
>> > > > > > >
>> > > > > > > bool chunkingEnabled = 7;
>> > > > > > >
>> > > > > > > bool blockIfQueueFullDisabled = 8;
>> > > > > > >
>> > > > > > > CompressionType compressionType = 9;
>> > > > > > >
>> > > > > > > HashingScheme hashingScheme = 10;
>> > > > > > >
>> > > > > > > MessageRoutingMode messageRoutingMode = 11;
>> > > > > > >
>> > > > > > > int64 batchingMaxPublishDelay = 12;
>> > > > > > >
>> > > > > > > }
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > To support these new parameters during local run and for
>> > > convenience,
>> > > > we
>> > > > > > > should be able to specify them in the Admin CLI and in local
>> run
>> > > > modes.
>> > > > > > >
>> > > > > > > For local run, as function_worker.yml is not used, these
>> parameters
>> > > > will
>> > > > > > > have defaults and be primitive values:
>> > > > > > >
>> > > > > > > @Parameter(names = "--clusterFunctionBatchingDisabled",
>> description
>> > > =
>> > > > > > > "Disable
>> > > > > > > the default message batching behavior for functions", hidden
>> = true)
>> > > > > > >
>> > > > > > > public boolean clusterFunctionBatchingDisabled = false;
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Parameter(names = "--clusterFunctionChunkingEnabled",
>> description =
>> > > > > > "The
>> > > > > > > default message chunking behavior for functions", hidden =
>> true)
>> > > > > > >
>> > > > > > > public boolean clusterFunctionChunkingEnabled = false;
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Parameter(names =
>> "--clusterFunctionBlockIfQueueFullDisabled",
>> > > > > > description
>> > > > > > > = "Disable the default blocking behavior for functions when
>> queue is
>> > > > > > > full", hidden
>> > > > > > > = true)
>> > > > > > >
>> > > > > > > public boolean clusterFunctionBlockIfQueueFullDisabled =
>> false;
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Parameter(names = "--clusterFunctionCompressionTypeDefault",
>> > > > > > > description = "The
>> > > > > > > default Compression Type for functions", hidden = true)
>> > > > > > >
>> > > > > > > public String clusterFunctionCompressionTypeDefault = "LZ4";
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Parameter(names = "--clusterFunctionHashingSchemeDefault",
>> > > > description
>> > > > > > =
>> > > > > > > "The
>> > > > > > > default Hashing Scheme for functions", hidden = true)
>> > > > > > >
>> > > > > > > public String clusterFunctionHashingSchemeDefault =
>> > > "Murmur3_32Hash";
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Parameter(names =
>> "--clusterFunctionMessageRoutingModeDefault",
>> > > > > > > description
>> > > > > > > = "The default Message Routing Mode for functions", hidden =
>> true)
>> > > > > > >
>> > > > > > > public String clusterFunctionMessageRoutingModeDefault =
>> > > > > > "CustomPartition";
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > @Parameter(names =
>> > > "--clusterFunctionBatchingMaxPublishDelayDefault",
>> > > > > > > description
>> > > > > > > = "The default max publish delay (in milliseconds) for
>> functions
>> > > when
>> > > > > > > message batching is enabled", hidden = true)
>> > > > > > >
>> > > > > > > public long clusterFunctionBatchingMaxPublishDelayDefault =
>> 10L;
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > For Admin CLI, parameters can be provided in a similar way,
>> but they
>> > > > > > will
>> > > > > > > not have defaults (i.e. Boolean instead of boolean) since
>> parameters
>> > > > > > should
>> > > > > > > be null to ensure non-provided configs are obtained from
>> > > WorkerConfig.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > To handle the logic of selecting the property from an incoming
>> > > > > > > producerConfig vs the producerDefaults object (from
>> WorkerConfig),
>> > > we
>> > > > > > can
>> > > > > > > use a FunctionDefaultsMediator to handle this. That interface
>> looks
>> > > > like
>> > > > > > > this and gives us flexibility to handle mediation for specific
>> > > > > > properties
>> > > > > > > without needing to convert the entire object.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > public interface FunctionDefaultsMediator {
>> > > > > > >
>> > > > > > > boolean isBatchingDisabled();
>> > > > > > >
>> > > > > > > boolean isChunkingEnabled();
>> > > > > > >
>> > > > > > > boolean isBlockIfQueueFullDisabled();
>> > > > > > >
>> > > > > > > CompressionType getCompressionType();
>> > > > > > >
>> > > > > > > Function.CompressionType getCompressionTypeProto();
>> > > > > > >
>> > > > > > > HashingScheme getHashingScheme();
>> > > > > > >
>> > > > > > > Function.HashingScheme getHashingSchemeProto();
>> > > > > > >
>> > > > > > > MessageRoutingMode getMessageRoutingMode();
>> > > > > > >
>> > > > > > > Function.MessageRoutingMode getMessageRoutingModeProto();
>> > > > > > >
>> > > > > > > Long getBatchingMaxPublishDelay();
>> > > > > > >
>> > > > > > > }
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > (Alternatively, we could remove the Proto methods from the
>> mediator
>> > > > and
>> > > > > > > just use FunctionConfigUtils.convert(..) to translate between
>> the
>> > > > > > protobuf
>> > > > > > > ProducerSpec and the Java ProducerConfig.)
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > *Testing*
>> > > > > > >
>> > > > > > > In terms of testing, many of this functionality will be
>> covered by
>> > > > > > existing
>> > > > > > > tests in the pulsar-functions module. However, we need to
>> test each
>> > > of
>> > > > > > the
>> > > > > > > new classes, as well as these behaviors:
>> > > > > > >
>> > > > > > > - functionRegistration (with different provided ProducerConfig
>> > > > > > > properties)
>> > > > > > > - functionUpdate (for ignoreExistingFunctionDefaults=true and
>> > > > > > > ignoreExistingFunctionDefaults=false)
>> > > > > > > - loading function_worker.yml into WorkerConfig
>> > > > > > > - throwing exceptions correctly when invalid parameters are
>> > > > provided
>> > > > > > > - converting between ProducerSpec and ProducerConfig
>> > > > > > > - correctly handling precedence when computing the final
>> > > > > > ProducerConfig
>> > > > > > > values
>> > > > > > > - creating producers
>> > > > > > > - ensuring functionality is consistent across runtimes
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > I'd like feedback on this proposal.
>> > > > > > >
>> > > > > > > Devin G. Bost
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>>
>

Reply via email to