What would be some of the additional benefits of using the interceptor approach?
-- Devin G. Bost On Thu, Apr 1, 2021, 6:03 PM Jerry Peng <jerry.boyang.p...@gmail.com> wrote: > Devin, > > Interceptors are located on the server side (broker/worker). Cluster > admins would create the plugins based on the interfaces I described and > install them on the server side. Regular function developers will not > implement or interact directly with the interceptor. > > > Would it be better to just ignore WorkerConfig defaults during function > update? > > Yes. > > If the cluster admin changes the function config defaults and wants > existing functions to utilize those configs that have changed, the admin > can just update those configs of the functions through the regular update > mechanism for functions we have today. > > > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <devin.b...@gmail.com> wrote: > > > Would it be better to just ignore WorkerConfig defaults during function > > update? We could still update the function producer behavior through the > > producerConfig passed as a REST parameter, but we'd avoid the edge case I > > mentioned. > > > > Devin G. Bost > > > > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com> wrote: > > > > > I just remembered an edge case that motivated me to create that > > additional > > > flow. > > > In my PR, if any producerConfig values are null at registration or > > update, > > > we get those values from WorkerConfig instead. > > > However, when a user runs an update (on a function) for the first time > > > after upgrading to the version of Pulsar with this feature, if someone > > has > > > modified the function_worker.yml file (perhaps without their > knowledge), > > > those defaults from function_worker.yml will get picked up because the > > > existing producerConfig will have null set for those values (because > the > > > function was registered prior to the existence of this feature.) So, > > > the user could be trying to update something unrelated, not realizing > > that > > > their function is going to have its producerConfig updated by the > cluster > > > defaults. Most users won't be affected by this, but it could be tricky > to > > > diagnose for the few who would get impacted by it. (Is this description > > any > > > clearer? If not, it might be clearer if I diagram it.) > > > > > > Devin G. Bost > > > > > > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng <jerry.boyang.p...@gmail.com > > > > > wrote: > > > > > >> Hi Devin, > > >> > > >> I understand the usefulness of giving cluster admins the ability to > > >> specify > > >> some defaults for functions at a cluster level. I think the right > time > > to > > >> apply those defaults is during function registration time. I am not > > sure > > >> I > > >> understand the usefulness of what you are proposing we do during > update > > >> time. If you would like to change a config of an already running > > function > > >> why not just update it with config changes that are needed. I am not > > sure > > >> you need the additional workflow you proposed for the update. > > >> > > >> Another idea to satisfy your use case is that instead of leveraging a > > >> default config mechanism perhaps we should take a look at implementing > > >> interceptors for the function/sources/sink API calls. We already have > > >> interceptors implemented for many other pulsar functionalities. > Perhaps > > we > > >> should do it for Pulsar Functions APIs as well. With interceptors you > > >> will > > >> be able to intercept all API operations to functions and customize the > > >> requests however you would like. > > >> > > >> We can have a interface like: > > >> > > >> interface FunctionInterceptors { > > >> > > >> void regsterFunctionInterceptor (String tenant, String namespace, > String > > >> functionName, FunctionConfig functionConfig...); > > >> > > >> ... > > >> > > >> } > > >> > > >> for developers to implement. During the beginning of every Function > API > > >> call the corresponding interceptor method gets called. For example, > the > > >> "regsterFunctionInterceptor()" method I provided above will be called > > >> towards the beginning of the registerFunction and allow you to > customize > > >> the function config however you want. > > >> > > >> Best, > > >> > > >> Jerry > > >> > > >> > > >> > > >> On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <devin.b...@gmail.com> > wrote: > > >> > > >> > *Cluster-Wide and Function-Specific Producer Defaults* > > >> > > > >> > > > >> > > > >> > > > >> > * - Status: Proposal- Author: Devin Bost (with guidance from Jerry > > >> Peng)- > > >> > Pull Request: https://github.com/apache/pulsar/pull/9987 > > >> > <https://github.com/apache/pulsar/pull/9987>- Mailing List > > discussion: > > >> - > > >> > Release: 2.8.0* > > >> > > > >> > > > >> > *Motivation* > > >> > > > >> > Pulsar currently provides no way to allow users or operators to > change > > >> the > > >> > producer behavior of Pulsar functions. For organizations that are > > making > > >> > heavy use of functions, the ability to tune messaging behavior of > > >> functions > > >> > is greatly desired. Moreover, current function messaging defaults > > >> appear to > > >> > be causing issues for certain workloads. (For example, some bugs > > appear > > >> to > > >> > be related to having batching enabled. See #6054.) Enabling > operators > > to > > >> > modify these settings will help Pulsar cluster admins workaround > > issues > > >> to > > >> > enable them to upgrade legacy versions of Pulsar to more recent > > >> versions. > > >> > Moreover, enabling greater tuning of these settings can provide more > > >> > opportunity for performance optimization in production environments. > > We > > >> > need the ability to modify these function producer settings: > > >> > > > >> > > > >> > > > >> > - batchingEnabled > > >> > - chunkingEnabled > > >> > - blockIfQueueFull > > >> > - compressionType > > >> > - hashingScheme > > >> > - messageRoutingMode > > >> > - batchingMaxPublishDelay > > >> > > > >> > > > >> > > > >> > *Implementation Strategy* > > >> > > > >> > > > >> > > > >> > Ideally, we need a way to set default message producer behavior (for > > >> > functions) on a cluster-wide level. However, operators may want > > specific > > >> > settings to be used for certain functions. (We can't assume that > > >> everyone > > >> > will want all functions in a cluster to have the same producer > > >> settings.) > > >> > When an operator changes the cluster-wide producer defaults, they > > need a > > >> > way to be able to roll out the changes to functions without > > >> significantly > > >> > disrupting messaging flows in production environments. They also > need > > a > > >> > simple way to rollback changes in case a change to function producer > > >> > settings results in undesired behavior. However, not all users will > > want > > >> > function updates to replace existing function producer settings, > > >> especially > > >> > for functions that have been given custom producer settings (at the > > >> > per-function level.) Due to this difference in use cases, there > needs > > >> to be > > >> > a way to allow an operator to specify what update behavior they > want. > > >> For a > > >> > given update, the user should be able to specify if: > > >> > > > >> > 1. Existing function producer settings will be replaced by > > >> cluster-wide > > >> > defaults (unless overridden by producer settings provided in the > > >> > request), > > >> > or > > >> > 2. Existing function producer settings will *not* be replaced by > > >> > cluster-wide defaults (unless overridden by producer settings > > >> provided > > >> > in > > >> > the request). > > >> > > > >> > So, the two orders of precedence are: > > >> > > > >> > 1. newProducerConfig > cluster-wide defaults > > > existingProducerConfig > > >> > 2. newProducerConfig > existingProducerConfig > cluster-wide > > defaults > > >> > > > >> > We can add a boolean property to UpdateOptions to allow the > precedence > > >> to > > >> > be specified. Since we don't want to introduce unexpected side > effects > > >> > during an update, the default precedence should prefer the > > >> > existingProducerConfig over cluster-wide defaults. > > >> > > > >> > > > >> > > > >> > To ensure that the desired producer settings (after calculating the > > >> > settings from defaults + overrides) are available when creating the > > >> > producers (on any cluster), we need to add these producer settings > to > > >> the > > >> > function protobuf definition. Also, to ensure that settings can be > > >> properly > > >> > validated at the time they are submitted, we need validation in two > > >> places: > > >> > > > >> > 1. When cluster-wide configs are loaded upon broker startup > > >> > 2. When a function is registered or updated. > > >> > > > >> > As it would be impractical to check every registered function > > definition > > >> > during broker startup, most of the validation needs to occur during > > >> > function registration or update. However, during broker startup, we > > can > > >> at > > >> > least validate that the configurations exist and throw an exception > if > > >> they > > >> > are invalid. > > >> > > > >> > As any configuration exceptions thrown when the producer is created > > >> would > > >> > bubble up to the client, we need to ensure that the final > > configurations > > >> > are valid when a function is registered or updated to ensure there > are > > >> > exceptions when the producer is created. This requires computing the > > >> > producer settings (according to the desired precedence) during > > function > > >> > registration or update. > > >> > > > >> > When a function is registered, existing producer settings do not > exist > > >> for > > >> > it, so we simply need to get cluster-wide defaults and override them > > >> with > > >> > any specific producer settings provided in the FunctionConfig's > > >> > ProducerConfig (submitted as a REST parameter.) After the initial > > >> > registration, those computed settings will then be persisted in > > >> bookkeeper > > >> > with the function definition. > > >> > > > >> > When a function is updated, we also need to address the existing > > >> > producerConfig, and for this, we need to check the desired override > > >> > precedence (as explained above) when computing the final producer > > >> settings > > >> > that will be persisted with the function. > > >> > > > >> > > > >> > > > >> > *Modifications* > > >> > > > >> > > > >> > > > >> > As WorkerConfig is readily available when functions are registered > and > > >> > updated, we can put new settings into function_worker.yml and can > load > > >> and > > >> > validate them via WorkerConfig.load(..). To prevent introducing > > breaking > > >> > changes, parameters will have defaults assigned in WorkerConfig that > > >> > conform to current expected behavior, like this: > > >> > > > >> > > > >> > > > >> > @Data > > >> > > > >> > @Accessors(chain = true) > > >> > > > >> > public class FunctionDefaultsConfig implements Serializable { > > >> > > > >> > @FieldContext( > > >> > > > >> > doc = "Disables default message batching between > > functions" > > >> > > > >> > ) > > >> > > > >> > protected boolean batchingDisabled = false; > > >> > > > >> > @FieldContext( > > >> > > > >> > doc = "Enables default message chunking between > functions" > > >> > > > >> > ) > > >> > > > >> > protected boolean chunkingEnabled = false; > > >> > > > >> > @FieldContext( > > >> > > > >> > doc = "Disables default behavior to block when message > > >> queue is > > >> > full" > > >> > > > >> > ) > > >> > > > >> > protected boolean blockIfQueueFullDisabled = false; > > >> > > > >> > @FieldContext( > > >> > > > >> > doc = "Default compression type" > > >> > > > >> > ) > > >> > > > >> > protected String compressionType = "LZ4"; > > >> > > > >> > @FieldContext( > > >> > > > >> > doc = "Default hashing scheme" > > >> > > > >> > ) > > >> > > > >> > protected String hashingScheme = "Murmur3_32Hash"; > > >> > > > >> > @FieldContext( > > >> > > > >> > doc = "Default hashing scheme" > > >> > > > >> > ) > > >> > > > >> > protected String messageRoutingMode = "CustomPartition"; > > >> > > > >> > @FieldContext( > > >> > > > >> > doc = "Default max publish delay (in milliseconds) when > > >> message > > >> > batching is enabled" > > >> > > > >> > ) > > >> > > > >> > protected long batchingMaxPublishDelay = 10L; > > >> > > > >> > > > >> > > > >> > public ClusterFunctionProducerDefaults buildProducerDefaults() > > >> > throws InvalidWorkerConfigDefaultException > > >> > { > > >> > > > >> > return new > > >> > ClusterFunctionProducerDefaults(this.isBatchingDisabled() > > >> > , > > >> > > > >> > this.isChunkingEnabled(), > > >> > this.isBlockIfQueueFullDisabled(), > > >> > this.getCompressionType(), > > >> > > > >> > this.getHashingScheme(), > this.getMessageRoutingMode(), > > >> this > > >> > .getBatchingMaxPublishDelay()); > > >> > > > >> > } > > >> > > > >> > > > >> > > > >> > public FunctionDefaultsConfig() { > > >> > > > >> > } > > >> > > > >> > } > > >> > > > >> > > > >> > > > >> > The additional section in function_worker.yml will look like this: > > >> > > > >> > > > >> > > > >> > functionDefaults: > > >> > > > >> > batchingDisabled: true > > >> > > > >> > chunkingEnabled: false > > >> > > > >> > blockIfQueueFullDisabled: false > > >> > > > >> > batchingMaxPublishDelay: 12 > > >> > > > >> > compressionType: ZLIB > > >> > > > >> > hashingScheme: JavaStringHash > > >> > > > >> > messageRoutingMode: RoundRobinPartition > > >> > > > >> > > > >> > > > >> > During function update, we can use a new boolean option, > > >> > ignoreExistingFunctionDefaults, to toggle precedence: > > >> > > > >> > > > >> > > > >> > @Data > > >> > > > >> > @NoArgsConstructor > > >> > > > >> > @ApiModel(value = "UpdateOptions", description = "Options while > > updating > > >> > function defaults or the sink") > > >> > > > >> > public class UpdateOptions { > > >> > > > >> > @ApiModelProperty( > > >> > > > >> > value = "Whether or not to update the auth data", > > >> > > > >> > name = "update-auth-data") > > >> > > > >> > private boolean updateAuthData = false; > > >> > > > >> > > > >> > > > >> > @ApiModelProperty( > > >> > > > >> > value="Whether or not to ignore any existing function > > >> > defaults", > > >> > > > >> > name ="ignore-existing-function-defaults") > > >> > > > >> > private boolean ignoreExistingFunctionDefaults = false; > > >> > > > >> > } > > >> > > > >> > > > >> > > > >> > To ensure that function producer settings don't get modified > > >> unexpectedly > > >> > by an update, ignoreExistingFunctionDefaults = false by default. > > >> > > > >> > > > >> > > > >> > The updated ProducerConfig will contain the new properties and a > > method > > >> to > > >> > merge an incoming ProducerConfig with an existing ProducerConfig. > (The > > >> > merged ProducerConfig will use the incoming ProducerConfig > properties > > >> when > > >> > they exist and will use existing ProducerConfig properties > otherwise.) > > >> To > > >> > ensure we don't need to force operators to provide those > > ProducerConfig > > >> > properties with every create or update, they must be nullable, not > > >> > primitive (e.g. Boolean instead of boolean.) > > >> > > > >> > > > >> > > > >> > To support the additional properties, FunctionConfigUtils (and any > > >> similar > > >> > classes) will need to be modified to correctly translate between the > > >> > ProducerConfig and the protobuf ProducerSpec. > > >> > > > >> > > > >> > > > >> > The updated ProducerSpec in Function.proto will look like this: > > >> > > > >> > message ProducerSpec { > > >> > > > >> > int32 maxPendingMessages = 1; > > >> > > > >> > int32 maxPendingMessagesAcrossPartitions = 2; > > >> > > > >> > bool useThreadLocalProducers = 3; > > >> > > > >> > CryptoSpec cryptoSpec = 4; > > >> > > > >> > string batchBuilder = 5; > > >> > > > >> > bool batchingDisabled = 6; > > >> > > > >> > bool chunkingEnabled = 7; > > >> > > > >> > bool blockIfQueueFullDisabled = 8; > > >> > > > >> > CompressionType compressionType = 9; > > >> > > > >> > HashingScheme hashingScheme = 10; > > >> > > > >> > MessageRoutingMode messageRoutingMode = 11; > > >> > > > >> > int64 batchingMaxPublishDelay = 12; > > >> > > > >> > } > > >> > > > >> > > > >> > > > >> > To support these new parameters during local run and for > convenience, > > we > > >> > should be able to specify them in the Admin CLI and in local run > > modes. > > >> > > > >> > For local run, as function_worker.yml is not used, these parameters > > will > > >> > have defaults and be primitive values: > > >> > > > >> > @Parameter(names = "--clusterFunctionBatchingDisabled", description > = > > >> > "Disable > > >> > the default message batching behavior for functions", hidden = true) > > >> > > > >> > public boolean clusterFunctionBatchingDisabled = false; > > >> > > > >> > > > >> > > > >> > @Parameter(names = "--clusterFunctionChunkingEnabled", description = > > >> "The > > >> > default message chunking behavior for functions", hidden = true) > > >> > > > >> > public boolean clusterFunctionChunkingEnabled = false; > > >> > > > >> > > > >> > > > >> > @Parameter(names = "--clusterFunctionBlockIfQueueFullDisabled", > > >> description > > >> > = "Disable the default blocking behavior for functions when queue is > > >> > full", hidden > > >> > = true) > > >> > > > >> > public boolean clusterFunctionBlockIfQueueFullDisabled = false; > > >> > > > >> > > > >> > > > >> > @Parameter(names = "--clusterFunctionCompressionTypeDefault", > > >> > description = "The > > >> > default Compression Type for functions", hidden = true) > > >> > > > >> > public String clusterFunctionCompressionTypeDefault = "LZ4"; > > >> > > > >> > > > >> > > > >> > @Parameter(names = "--clusterFunctionHashingSchemeDefault", > > description > > >> = > > >> > "The > > >> > default Hashing Scheme for functions", hidden = true) > > >> > > > >> > public String clusterFunctionHashingSchemeDefault = > "Murmur3_32Hash"; > > >> > > > >> > > > >> > > > >> > @Parameter(names = "--clusterFunctionMessageRoutingModeDefault", > > >> > description > > >> > = "The default Message Routing Mode for functions", hidden = true) > > >> > > > >> > public String clusterFunctionMessageRoutingModeDefault = > > >> "CustomPartition"; > > >> > > > >> > > > >> > > > >> > @Parameter(names = > "--clusterFunctionBatchingMaxPublishDelayDefault", > > >> > description > > >> > = "The default max publish delay (in milliseconds) for functions > when > > >> > message batching is enabled", hidden = true) > > >> > > > >> > public long clusterFunctionBatchingMaxPublishDelayDefault = 10L; > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > For Admin CLI, parameters can be provided in a similar way, but they > > >> will > > >> > not have defaults (i.e. Boolean instead of boolean) since parameters > > >> should > > >> > be null to ensure non-provided configs are obtained from > WorkerConfig. > > >> > > > >> > > > >> > > > >> > To handle the logic of selecting the property from an incoming > > >> > producerConfig vs the producerDefaults object (from WorkerConfig), > we > > >> can > > >> > use a FunctionDefaultsMediator to handle this. That interface looks > > like > > >> > this and gives us flexibility to handle mediation for specific > > >> properties > > >> > without needing to convert the entire object. > > >> > > > >> > > > >> > > > >> > public interface FunctionDefaultsMediator { > > >> > > > >> > boolean isBatchingDisabled(); > > >> > > > >> > boolean isChunkingEnabled(); > > >> > > > >> > boolean isBlockIfQueueFullDisabled(); > > >> > > > >> > CompressionType getCompressionType(); > > >> > > > >> > Function.CompressionType getCompressionTypeProto(); > > >> > > > >> > HashingScheme getHashingScheme(); > > >> > > > >> > Function.HashingScheme getHashingSchemeProto(); > > >> > > > >> > MessageRoutingMode getMessageRoutingMode(); > > >> > > > >> > Function.MessageRoutingMode getMessageRoutingModeProto(); > > >> > > > >> > Long getBatchingMaxPublishDelay(); > > >> > > > >> > } > > >> > > > >> > > > >> > > > >> > (Alternatively, we could remove the Proto methods from the mediator > > and > > >> > just use FunctionConfigUtils.convert(..) to translate between the > > >> protobuf > > >> > ProducerSpec and the Java ProducerConfig.) > > >> > > > >> > > > >> > > > >> > > > >> > *Testing* > > >> > > > >> > In terms of testing, many of this functionality will be covered by > > >> existing > > >> > tests in the pulsar-functions module. However, we need to test each > of > > >> the > > >> > new classes, as well as these behaviors: > > >> > > > >> > - functionRegistration (with different provided ProducerConfig > > >> > properties) > > >> > - functionUpdate (for ignoreExistingFunctionDefaults=true and > > >> > ignoreExistingFunctionDefaults=false) > > >> > - loading function_worker.yml into WorkerConfig > > >> > - throwing exceptions correctly when invalid parameters are > > provided > > >> > - converting between ProducerSpec and ProducerConfig > > >> > - correctly handling precedence when computing the final > > >> ProducerConfig > > >> > values > > >> > - creating producers > > >> > - ensuring functionality is consistent across runtimes > > >> > > > >> > > > >> > > > >> > I'd like feedback on this proposal. > > >> > > > >> > Devin G. Bost > > >> > > > >> > > > > > >