Would it be better to just ignore WorkerConfig defaults during function
update? We could still update the function producer behavior through the
producerConfig passed as a REST parameter, but we'd avoid the edge case I
mentioned.

Devin G. Bost


On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com> wrote:

> I just remembered an edge case that motivated me to create that additional
> flow.
> In my PR, if any producerConfig values are null at registration or update,
> we get those values from WorkerConfig instead.
> However, when a user runs an update (on a function) for the first time
> after upgrading to the version of Pulsar with this feature, if someone has
> modified the function_worker.yml file (perhaps without their knowledge),
> those defaults from function_worker.yml will get picked up because the
> existing producerConfig will have null set for those values (because the
> function was registered prior to the existence of this feature.) So,
> the user could be trying to update something unrelated, not realizing that
> their function is going to have its producerConfig updated by the cluster
> defaults. Most users won't be affected by this, but it could be tricky to
> diagnose for the few who would get impacted by it. (Is this description any
> clearer? If not, it might be clearer if I diagram it.)
>
> Devin G. Bost
>
>
> On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng <jerry.boyang.p...@gmail.com>
> wrote:
>
>> Hi Devin,
>>
>> I understand the usefulness of giving cluster admins the ability to
>> specify
>> some defaults for functions at a cluster level.  I think the right time to
>> apply those defaults is during function registration time.  I am not sure
>> I
>> understand the usefulness of what you are proposing we do during update
>> time.  If you would like to change a config of an already running function
>> why not just update it with config changes that are needed.  I am not sure
>> you need the additional workflow you proposed for the update.
>>
>> Another idea to satisfy your use case is that instead of leveraging a
>> default config mechanism perhaps we should take a look at implementing
>> interceptors for the function/sources/sink API calls.  We already have
>> interceptors implemented for many other pulsar functionalities. Perhaps we
>> should do it for Pulsar Functions APIs as well.  With interceptors you
>> will
>> be able to intercept all API operations to functions and customize the
>> requests however you would like.
>>
>> We can have a interface like:
>>
>> interface FunctionInterceptors {
>>
>> void regsterFunctionInterceptor (String tenant, String namespace, String
>> functionName, FunctionConfig functionConfig...);
>>
>> ...
>>
>> }
>>
>> for developers to implement. During the beginning of every Function API
>> call the corresponding interceptor method gets called.  For example, the
>> "regsterFunctionInterceptor()" method I provided above will be called
>> towards the beginning of the registerFunction and allow you to customize
>> the function config however you want.
>>
>> Best,
>>
>> Jerry
>>
>>
>>
>> On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <devin.b...@gmail.com> wrote:
>>
>> > *Cluster-Wide and Function-Specific Producer Defaults*
>> >
>> >
>> >
>> >
>> > * - Status: Proposal- Author: Devin Bost (with guidance from Jerry
>> Peng)-
>> > Pull Request: https://github.com/apache/pulsar/pull/9987
>> > <https://github.com/apache/pulsar/pull/9987>- Mailing List discussion:
>> -
>> > Release: 2.8.0*
>> >
>> >
>> > *Motivation*
>> >
>> > Pulsar currently provides no way to allow users or operators to change
>> the
>> > producer behavior of Pulsar functions. For organizations that are making
>> > heavy use of functions, the ability to tune messaging behavior of
>> functions
>> > is greatly desired. Moreover, current function messaging defaults
>> appear to
>> > be causing issues for certain workloads. (For example, some bugs appear
>> to
>> > be related to having batching enabled. See #6054.) Enabling operators to
>> > modify these settings will help Pulsar cluster admins workaround issues
>> to
>> > enable them to upgrade legacy versions of Pulsar to more recent
>> versions.
>> > Moreover, enabling greater tuning of these settings can provide more
>> > opportunity for performance optimization in production environments. We
>> > need the ability to modify these function producer settings:
>> >
>> >
>> >
>> >    - batchingEnabled
>> >    - chunkingEnabled
>> >    - blockIfQueueFull
>> >    - compressionType
>> >    - hashingScheme
>> >    - messageRoutingMode
>> >    - batchingMaxPublishDelay
>> >
>> >
>> >
>> > *Implementation Strategy*
>> >
>> >
>> >
>> > Ideally, we need a way to set default message producer behavior (for
>> > functions) on a cluster-wide level. However, operators may want specific
>> > settings to be used for certain functions. (We can't assume that
>> everyone
>> > will want all functions in a cluster to have the same producer
>> settings.)
>> > When an operator changes the cluster-wide producer defaults, they need a
>> > way to be able to roll out the changes to functions without
>> significantly
>> > disrupting messaging flows in production environments. They also need a
>> > simple way to rollback changes in case a change to function producer
>> > settings results in undesired behavior. However, not all users will want
>> > function updates to replace existing function producer settings,
>> especially
>> > for functions that have been given custom producer settings (at the
>> > per-function level.) Due to this difference in use cases, there needs
>> to be
>> > a way to allow an operator to specify what update behavior they want.
>> For a
>> > given update, the user should be able to specify if:
>> >
>> >    1. Existing function producer settings will be replaced by
>> cluster-wide
>> >    defaults (unless overridden by producer settings provided in the
>> > request),
>> >    or
>> >    2. Existing function producer settings will *not* be replaced by
>> >    cluster-wide defaults (unless overridden by producer settings
>> provided
>> > in
>> >    the request).
>> >
>> > So, the two orders of precedence are:
>> >
>> >    1. newProducerConfig > cluster-wide defaults > existingProducerConfig
>> >    2. newProducerConfig > existingProducerConfig > cluster-wide defaults
>> >
>> > We can add a boolean property to UpdateOptions to allow the precedence
>> to
>> > be specified. Since we don't want to introduce unexpected side effects
>> > during an update, the default precedence should prefer the
>> > existingProducerConfig over cluster-wide defaults.
>> >
>> >
>> >
>> > To ensure that the desired producer settings (after calculating the
>> > settings from defaults + overrides) are available when creating the
>> > producers (on any cluster), we need to add these producer settings to
>> the
>> > function protobuf definition. Also, to ensure that settings can be
>> properly
>> > validated at the time they are submitted, we need validation in two
>> places:
>> >
>> >    1. When cluster-wide configs are loaded upon broker startup
>> >    2. When a function is registered or updated.
>> >
>> > As it would be impractical to check every registered function definition
>> > during broker startup, most of the validation needs to occur during
>> > function registration or update. However, during broker startup, we can
>> at
>> > least validate that the configurations exist and throw an exception if
>> they
>> > are invalid.
>> >
>> > As any configuration exceptions thrown when the producer is created
>> would
>> > bubble up to the client, we need to ensure that the final configurations
>> > are valid when a function is registered or updated to ensure there are
>> > exceptions when the producer is created. This requires computing the
>> > producer settings (according to the desired precedence) during function
>> > registration or update.
>> >
>> > When a function is registered, existing producer settings do not exist
>> for
>> > it, so we simply need to get cluster-wide defaults and override them
>> with
>> > any specific producer settings provided in the FunctionConfig's
>> > ProducerConfig (submitted as a REST parameter.) After the initial
>> > registration, those computed settings will then be persisted in
>> bookkeeper
>> > with the function definition.
>> >
>> > When a function is updated, we also need to address the existing
>> > producerConfig, and for this, we need to check the desired override
>> > precedence (as explained above) when computing the final producer
>> settings
>> > that will be persisted with the function.
>> >
>> >
>> >
>> > *Modifications*
>> >
>> >
>> >
>> > As WorkerConfig is readily available when functions are registered and
>> > updated, we can put new settings into function_worker.yml and can load
>> and
>> > validate them via WorkerConfig.load(..). To prevent introducing breaking
>> > changes, parameters will have defaults assigned in WorkerConfig that
>> > conform to current expected behavior, like this:
>> >
>> >
>> >
>> > @Data
>> >
>> > @Accessors(chain = true)
>> >
>> > public class FunctionDefaultsConfig implements Serializable {
>> >
>> >     @FieldContext(
>> >
>> >             doc = "Disables default message batching between functions"
>> >
>> >     )
>> >
>> >     protected boolean batchingDisabled = false;
>> >
>> >     @FieldContext(
>> >
>> >             doc = "Enables default message chunking between functions"
>> >
>> >     )
>> >
>> >     protected boolean chunkingEnabled = false;
>> >
>> >     @FieldContext(
>> >
>> >             doc = "Disables default behavior to block when message
>> queue is
>> > full"
>> >
>> >     )
>> >
>> >     protected boolean blockIfQueueFullDisabled = false;
>> >
>> >     @FieldContext(
>> >
>> >             doc = "Default compression type"
>> >
>> >     )
>> >
>> >     protected String compressionType = "LZ4";
>> >
>> >     @FieldContext(
>> >
>> >             doc = "Default hashing scheme"
>> >
>> >     )
>> >
>> >     protected String hashingScheme = "Murmur3_32Hash";
>> >
>> >     @FieldContext(
>> >
>> >             doc = "Default hashing scheme"
>> >
>> >     )
>> >
>> >     protected String messageRoutingMode = "CustomPartition";
>> >
>> >     @FieldContext(
>> >
>> >             doc = "Default max publish delay (in milliseconds) when
>> message
>> > batching is enabled"
>> >
>> >     )
>> >
>> >     protected long batchingMaxPublishDelay = 10L;
>> >
>> >
>> >
>> >     public ClusterFunctionProducerDefaults buildProducerDefaults()
>> > throws InvalidWorkerConfigDefaultException
>> > {
>> >
>> >         return new
>> > ClusterFunctionProducerDefaults(this.isBatchingDisabled()
>> > ,
>> >
>> >                 this.isChunkingEnabled(),
>> > this.isBlockIfQueueFullDisabled(),
>> > this.getCompressionType(),
>> >
>> >                 this.getHashingScheme(), this.getMessageRoutingMode(),
>> this
>> > .getBatchingMaxPublishDelay());
>> >
>> >     }
>> >
>> >
>> >
>> >     public FunctionDefaultsConfig() {
>> >
>> >     }
>> >
>> > }
>> >
>> >
>> >
>> > The additional section in function_worker.yml will look like this:
>> >
>> >
>> >
>> > functionDefaults:
>> >
>> >   batchingDisabled: true
>> >
>> >   chunkingEnabled: false
>> >
>> >   blockIfQueueFullDisabled: false
>> >
>> >   batchingMaxPublishDelay: 12
>> >
>> >   compressionType: ZLIB
>> >
>> >   hashingScheme: JavaStringHash
>> >
>> >   messageRoutingMode: RoundRobinPartition
>> >
>> >
>> >
>> > During function update, we can use a new boolean option,
>> > ignoreExistingFunctionDefaults, to toggle precedence:
>> >
>> >
>> >
>> > @Data
>> >
>> > @NoArgsConstructor
>> >
>> > @ApiModel(value = "UpdateOptions", description = "Options while updating
>> > function defaults or the sink")
>> >
>> > public class UpdateOptions {
>> >
>> >     @ApiModelProperty(
>> >
>> >         value = "Whether or not to update the auth data",
>> >
>> >         name = "update-auth-data")
>> >
>> >     private boolean updateAuthData = false;
>> >
>> >
>> >
>> >     @ApiModelProperty(
>> >
>> >             value="Whether or not to ignore any existing function
>> > defaults",
>> >
>> >             name ="ignore-existing-function-defaults")
>> >
>> >     private boolean ignoreExistingFunctionDefaults = false;
>> >
>> > }
>> >
>> >
>> >
>> > To ensure that function producer settings don't get modified
>> unexpectedly
>> > by an update, ignoreExistingFunctionDefaults = false by default.
>> >
>> >
>> >
>> > The updated ProducerConfig will contain the new properties and a method
>> to
>> > merge an incoming ProducerConfig with an existing ProducerConfig. (The
>> > merged ProducerConfig will use the incoming ProducerConfig properties
>> when
>> > they exist and will use existing ProducerConfig properties otherwise.)
>> To
>> > ensure we don't need to force operators to provide those ProducerConfig
>> > properties with every create or update, they must be nullable, not
>> > primitive (e.g. Boolean instead of boolean.)
>> >
>> >
>> >
>> > To support the additional properties, FunctionConfigUtils (and any
>> similar
>> > classes) will need to be modified to correctly translate between the
>> > ProducerConfig and the protobuf ProducerSpec.
>> >
>> >
>> >
>> > The updated ProducerSpec in Function.proto will look like this:
>> >
>> > message ProducerSpec {
>> >
>> >     int32 maxPendingMessages = 1;
>> >
>> >     int32 maxPendingMessagesAcrossPartitions = 2;
>> >
>> >     bool useThreadLocalProducers = 3;
>> >
>> >     CryptoSpec cryptoSpec = 4;
>> >
>> >     string batchBuilder = 5;
>> >
>> >     bool batchingDisabled = 6;
>> >
>> >     bool chunkingEnabled = 7;
>> >
>> >     bool blockIfQueueFullDisabled = 8;
>> >
>> >     CompressionType compressionType = 9;
>> >
>> >     HashingScheme hashingScheme = 10;
>> >
>> >     MessageRoutingMode messageRoutingMode = 11;
>> >
>> >     int64 batchingMaxPublishDelay = 12;
>> >
>> > }
>> >
>> >
>> >
>> > To support these new parameters during local run and for convenience, we
>> > should be able to specify them in the Admin CLI and in local run modes.
>> >
>> > For local run, as function_worker.yml is not used, these parameters will
>> > have defaults and be primitive values:
>> >
>> > @Parameter(names = "--clusterFunctionBatchingDisabled", description =
>> > "Disable
>> > the default message batching behavior for functions", hidden = true)
>> >
>> > public boolean clusterFunctionBatchingDisabled = false;
>> >
>> >
>> >
>> > @Parameter(names = "--clusterFunctionChunkingEnabled", description =
>> "The
>> > default message chunking behavior for functions", hidden = true)
>> >
>> > public boolean clusterFunctionChunkingEnabled = false;
>> >
>> >
>> >
>> > @Parameter(names = "--clusterFunctionBlockIfQueueFullDisabled",
>> description
>> > = "Disable the default blocking behavior for functions when queue is
>> > full", hidden
>> > = true)
>> >
>> > public boolean clusterFunctionBlockIfQueueFullDisabled = false;
>> >
>> >
>> >
>> > @Parameter(names = "--clusterFunctionCompressionTypeDefault",
>> > description = "The
>> > default Compression Type for functions", hidden = true)
>> >
>> > public String clusterFunctionCompressionTypeDefault = "LZ4";
>> >
>> >
>> >
>> > @Parameter(names = "--clusterFunctionHashingSchemeDefault", description
>> =
>> > "The
>> > default Hashing Scheme for functions", hidden = true)
>> >
>> > public String clusterFunctionHashingSchemeDefault = "Murmur3_32Hash";
>> >
>> >
>> >
>> > @Parameter(names = "--clusterFunctionMessageRoutingModeDefault",
>> > description
>> > = "The default Message Routing Mode for functions", hidden = true)
>> >
>> > public String clusterFunctionMessageRoutingModeDefault =
>> "CustomPartition";
>> >
>> >
>> >
>> > @Parameter(names = "--clusterFunctionBatchingMaxPublishDelayDefault",
>> > description
>> > = "The default max publish delay (in milliseconds) for functions when
>> > message batching is enabled", hidden = true)
>> >
>> > public long clusterFunctionBatchingMaxPublishDelayDefault = 10L;
>> >
>> >
>> >
>> >
>> >
>> > For Admin CLI, parameters can be provided in a similar way, but they
>> will
>> > not have defaults (i.e. Boolean instead of boolean) since parameters
>> should
>> > be null to ensure non-provided configs are obtained from WorkerConfig.
>> >
>> >
>> >
>> > To handle the logic of selecting the property from an incoming
>> > producerConfig vs the producerDefaults object (from WorkerConfig), we
>> can
>> > use a FunctionDefaultsMediator to handle this. That interface looks like
>> > this and gives us flexibility to handle mediation for specific
>> properties
>> > without needing to convert the entire object.
>> >
>> >
>> >
>> > public interface FunctionDefaultsMediator {
>> >
>> >     boolean isBatchingDisabled();
>> >
>> >     boolean isChunkingEnabled();
>> >
>> >     boolean isBlockIfQueueFullDisabled();
>> >
>> >     CompressionType getCompressionType();
>> >
>> >     Function.CompressionType getCompressionTypeProto();
>> >
>> >     HashingScheme getHashingScheme();
>> >
>> >     Function.HashingScheme getHashingSchemeProto();
>> >
>> >     MessageRoutingMode getMessageRoutingMode();
>> >
>> >     Function.MessageRoutingMode getMessageRoutingModeProto();
>> >
>> >     Long getBatchingMaxPublishDelay();
>> >
>> > }
>> >
>> >
>> >
>> > (Alternatively, we could remove the Proto methods from the mediator and
>> > just use FunctionConfigUtils.convert(..) to translate between the
>> protobuf
>> > ProducerSpec and the Java ProducerConfig.)
>> >
>> >
>> >
>> >
>> > *Testing*
>> >
>> > In terms of testing, many of this functionality will be covered by
>> existing
>> > tests in the pulsar-functions module. However, we need to test each of
>> the
>> > new classes, as well as these behaviors:
>> >
>> >    - functionRegistration (with different provided ProducerConfig
>> >    properties)
>> >    - functionUpdate (for ignoreExistingFunctionDefaults=true and
>> >    ignoreExistingFunctionDefaults=false)
>> >    - loading function_worker.yml into WorkerConfig
>> >    - throwing exceptions correctly when invalid parameters are provided
>> >    - converting between ProducerSpec and ProducerConfig
>> >    - correctly handling precedence when computing the final
>> ProducerConfig
>> >    values
>> >    - creating producers
>> >    - ensuring functionality is consistent across runtimes
>> >
>> >
>> >
>> > I'd like feedback on this proposal.
>> >
>> > Devin G. Bost
>> >
>>
>

Reply via email to