Devin, Interceptors are located on the server side (broker/worker). Cluster admins would create the plugins based on the interfaces I described and install them on the server side. Regular function developers will not implement or interact directly with the interceptor.
> Would it be better to just ignore WorkerConfig defaults during function update? Yes. If the cluster admin changes the function config defaults and wants existing functions to utilize those configs that have changed, the admin can just update those configs of the functions through the regular update mechanism for functions we have today. On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <devin.b...@gmail.com> wrote: > Would it be better to just ignore WorkerConfig defaults during function > update? We could still update the function producer behavior through the > producerConfig passed as a REST parameter, but we'd avoid the edge case I > mentioned. > > Devin G. Bost > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com> wrote: > > > I just remembered an edge case that motivated me to create that > additional > > flow. > > In my PR, if any producerConfig values are null at registration or > update, > > we get those values from WorkerConfig instead. > > However, when a user runs an update (on a function) for the first time > > after upgrading to the version of Pulsar with this feature, if someone > has > > modified the function_worker.yml file (perhaps without their knowledge), > > those defaults from function_worker.yml will get picked up because the > > existing producerConfig will have null set for those values (because the > > function was registered prior to the existence of this feature.) So, > > the user could be trying to update something unrelated, not realizing > that > > their function is going to have its producerConfig updated by the cluster > > defaults. Most users won't be affected by this, but it could be tricky to > > diagnose for the few who would get impacted by it. (Is this description > any > > clearer? If not, it might be clearer if I diagram it.) > > > > Devin G. Bost > > > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng <jerry.boyang.p...@gmail.com> > > wrote: > > > >> Hi Devin, > >> > >> I understand the usefulness of giving cluster admins the ability to > >> specify > >> some defaults for functions at a cluster level. I think the right time > to > >> apply those defaults is during function registration time. I am not > sure > >> I > >> understand the usefulness of what you are proposing we do during update > >> time. If you would like to change a config of an already running > function > >> why not just update it with config changes that are needed. I am not > sure > >> you need the additional workflow you proposed for the update. > >> > >> Another idea to satisfy your use case is that instead of leveraging a > >> default config mechanism perhaps we should take a look at implementing > >> interceptors for the function/sources/sink API calls. We already have > >> interceptors implemented for many other pulsar functionalities. Perhaps > we > >> should do it for Pulsar Functions APIs as well. With interceptors you > >> will > >> be able to intercept all API operations to functions and customize the > >> requests however you would like. > >> > >> We can have a interface like: > >> > >> interface FunctionInterceptors { > >> > >> void regsterFunctionInterceptor (String tenant, String namespace, String > >> functionName, FunctionConfig functionConfig...); > >> > >> ... > >> > >> } > >> > >> for developers to implement. During the beginning of every Function API > >> call the corresponding interceptor method gets called. For example, the > >> "regsterFunctionInterceptor()" method I provided above will be called > >> towards the beginning of the registerFunction and allow you to customize > >> the function config however you want. > >> > >> Best, > >> > >> Jerry > >> > >> > >> > >> On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <devin.b...@gmail.com> wrote: > >> > >> > *Cluster-Wide and Function-Specific Producer Defaults* > >> > > >> > > >> > > >> > > >> > * - Status: Proposal- Author: Devin Bost (with guidance from Jerry > >> Peng)- > >> > Pull Request: https://github.com/apache/pulsar/pull/9987 > >> > <https://github.com/apache/pulsar/pull/9987>- Mailing List > discussion: > >> - > >> > Release: 2.8.0* > >> > > >> > > >> > *Motivation* > >> > > >> > Pulsar currently provides no way to allow users or operators to change > >> the > >> > producer behavior of Pulsar functions. For organizations that are > making > >> > heavy use of functions, the ability to tune messaging behavior of > >> functions > >> > is greatly desired. Moreover, current function messaging defaults > >> appear to > >> > be causing issues for certain workloads. (For example, some bugs > appear > >> to > >> > be related to having batching enabled. See #6054.) Enabling operators > to > >> > modify these settings will help Pulsar cluster admins workaround > issues > >> to > >> > enable them to upgrade legacy versions of Pulsar to more recent > >> versions. > >> > Moreover, enabling greater tuning of these settings can provide more > >> > opportunity for performance optimization in production environments. > We > >> > need the ability to modify these function producer settings: > >> > > >> > > >> > > >> > - batchingEnabled > >> > - chunkingEnabled > >> > - blockIfQueueFull > >> > - compressionType > >> > - hashingScheme > >> > - messageRoutingMode > >> > - batchingMaxPublishDelay > >> > > >> > > >> > > >> > *Implementation Strategy* > >> > > >> > > >> > > >> > Ideally, we need a way to set default message producer behavior (for > >> > functions) on a cluster-wide level. However, operators may want > specific > >> > settings to be used for certain functions. (We can't assume that > >> everyone > >> > will want all functions in a cluster to have the same producer > >> settings.) > >> > When an operator changes the cluster-wide producer defaults, they > need a > >> > way to be able to roll out the changes to functions without > >> significantly > >> > disrupting messaging flows in production environments. They also need > a > >> > simple way to rollback changes in case a change to function producer > >> > settings results in undesired behavior. However, not all users will > want > >> > function updates to replace existing function producer settings, > >> especially > >> > for functions that have been given custom producer settings (at the > >> > per-function level.) Due to this difference in use cases, there needs > >> to be > >> > a way to allow an operator to specify what update behavior they want. > >> For a > >> > given update, the user should be able to specify if: > >> > > >> > 1. Existing function producer settings will be replaced by > >> cluster-wide > >> > defaults (unless overridden by producer settings provided in the > >> > request), > >> > or > >> > 2. Existing function producer settings will *not* be replaced by > >> > cluster-wide defaults (unless overridden by producer settings > >> provided > >> > in > >> > the request). > >> > > >> > So, the two orders of precedence are: > >> > > >> > 1. newProducerConfig > cluster-wide defaults > > existingProducerConfig > >> > 2. newProducerConfig > existingProducerConfig > cluster-wide > defaults > >> > > >> > We can add a boolean property to UpdateOptions to allow the precedence > >> to > >> > be specified. Since we don't want to introduce unexpected side effects > >> > during an update, the default precedence should prefer the > >> > existingProducerConfig over cluster-wide defaults. > >> > > >> > > >> > > >> > To ensure that the desired producer settings (after calculating the > >> > settings from defaults + overrides) are available when creating the > >> > producers (on any cluster), we need to add these producer settings to > >> the > >> > function protobuf definition. Also, to ensure that settings can be > >> properly > >> > validated at the time they are submitted, we need validation in two > >> places: > >> > > >> > 1. When cluster-wide configs are loaded upon broker startup > >> > 2. When a function is registered or updated. > >> > > >> > As it would be impractical to check every registered function > definition > >> > during broker startup, most of the validation needs to occur during > >> > function registration or update. However, during broker startup, we > can > >> at > >> > least validate that the configurations exist and throw an exception if > >> they > >> > are invalid. > >> > > >> > As any configuration exceptions thrown when the producer is created > >> would > >> > bubble up to the client, we need to ensure that the final > configurations > >> > are valid when a function is registered or updated to ensure there are > >> > exceptions when the producer is created. This requires computing the > >> > producer settings (according to the desired precedence) during > function > >> > registration or update. > >> > > >> > When a function is registered, existing producer settings do not exist > >> for > >> > it, so we simply need to get cluster-wide defaults and override them > >> with > >> > any specific producer settings provided in the FunctionConfig's > >> > ProducerConfig (submitted as a REST parameter.) After the initial > >> > registration, those computed settings will then be persisted in > >> bookkeeper > >> > with the function definition. > >> > > >> > When a function is updated, we also need to address the existing > >> > producerConfig, and for this, we need to check the desired override > >> > precedence (as explained above) when computing the final producer > >> settings > >> > that will be persisted with the function. > >> > > >> > > >> > > >> > *Modifications* > >> > > >> > > >> > > >> > As WorkerConfig is readily available when functions are registered and > >> > updated, we can put new settings into function_worker.yml and can load > >> and > >> > validate them via WorkerConfig.load(..). To prevent introducing > breaking > >> > changes, parameters will have defaults assigned in WorkerConfig that > >> > conform to current expected behavior, like this: > >> > > >> > > >> > > >> > @Data > >> > > >> > @Accessors(chain = true) > >> > > >> > public class FunctionDefaultsConfig implements Serializable { > >> > > >> > @FieldContext( > >> > > >> > doc = "Disables default message batching between > functions" > >> > > >> > ) > >> > > >> > protected boolean batchingDisabled = false; > >> > > >> > @FieldContext( > >> > > >> > doc = "Enables default message chunking between functions" > >> > > >> > ) > >> > > >> > protected boolean chunkingEnabled = false; > >> > > >> > @FieldContext( > >> > > >> > doc = "Disables default behavior to block when message > >> queue is > >> > full" > >> > > >> > ) > >> > > >> > protected boolean blockIfQueueFullDisabled = false; > >> > > >> > @FieldContext( > >> > > >> > doc = "Default compression type" > >> > > >> > ) > >> > > >> > protected String compressionType = "LZ4"; > >> > > >> > @FieldContext( > >> > > >> > doc = "Default hashing scheme" > >> > > >> > ) > >> > > >> > protected String hashingScheme = "Murmur3_32Hash"; > >> > > >> > @FieldContext( > >> > > >> > doc = "Default hashing scheme" > >> > > >> > ) > >> > > >> > protected String messageRoutingMode = "CustomPartition"; > >> > > >> > @FieldContext( > >> > > >> > doc = "Default max publish delay (in milliseconds) when > >> message > >> > batching is enabled" > >> > > >> > ) > >> > > >> > protected long batchingMaxPublishDelay = 10L; > >> > > >> > > >> > > >> > public ClusterFunctionProducerDefaults buildProducerDefaults() > >> > throws InvalidWorkerConfigDefaultException > >> > { > >> > > >> > return new > >> > ClusterFunctionProducerDefaults(this.isBatchingDisabled() > >> > , > >> > > >> > this.isChunkingEnabled(), > >> > this.isBlockIfQueueFullDisabled(), > >> > this.getCompressionType(), > >> > > >> > this.getHashingScheme(), this.getMessageRoutingMode(), > >> this > >> > .getBatchingMaxPublishDelay()); > >> > > >> > } > >> > > >> > > >> > > >> > public FunctionDefaultsConfig() { > >> > > >> > } > >> > > >> > } > >> > > >> > > >> > > >> > The additional section in function_worker.yml will look like this: > >> > > >> > > >> > > >> > functionDefaults: > >> > > >> > batchingDisabled: true > >> > > >> > chunkingEnabled: false > >> > > >> > blockIfQueueFullDisabled: false > >> > > >> > batchingMaxPublishDelay: 12 > >> > > >> > compressionType: ZLIB > >> > > >> > hashingScheme: JavaStringHash > >> > > >> > messageRoutingMode: RoundRobinPartition > >> > > >> > > >> > > >> > During function update, we can use a new boolean option, > >> > ignoreExistingFunctionDefaults, to toggle precedence: > >> > > >> > > >> > > >> > @Data > >> > > >> > @NoArgsConstructor > >> > > >> > @ApiModel(value = "UpdateOptions", description = "Options while > updating > >> > function defaults or the sink") > >> > > >> > public class UpdateOptions { > >> > > >> > @ApiModelProperty( > >> > > >> > value = "Whether or not to update the auth data", > >> > > >> > name = "update-auth-data") > >> > > >> > private boolean updateAuthData = false; > >> > > >> > > >> > > >> > @ApiModelProperty( > >> > > >> > value="Whether or not to ignore any existing function > >> > defaults", > >> > > >> > name ="ignore-existing-function-defaults") > >> > > >> > private boolean ignoreExistingFunctionDefaults = false; > >> > > >> > } > >> > > >> > > >> > > >> > To ensure that function producer settings don't get modified > >> unexpectedly > >> > by an update, ignoreExistingFunctionDefaults = false by default. > >> > > >> > > >> > > >> > The updated ProducerConfig will contain the new properties and a > method > >> to > >> > merge an incoming ProducerConfig with an existing ProducerConfig. (The > >> > merged ProducerConfig will use the incoming ProducerConfig properties > >> when > >> > they exist and will use existing ProducerConfig properties otherwise.) > >> To > >> > ensure we don't need to force operators to provide those > ProducerConfig > >> > properties with every create or update, they must be nullable, not > >> > primitive (e.g. Boolean instead of boolean.) > >> > > >> > > >> > > >> > To support the additional properties, FunctionConfigUtils (and any > >> similar > >> > classes) will need to be modified to correctly translate between the > >> > ProducerConfig and the protobuf ProducerSpec. > >> > > >> > > >> > > >> > The updated ProducerSpec in Function.proto will look like this: > >> > > >> > message ProducerSpec { > >> > > >> > int32 maxPendingMessages = 1; > >> > > >> > int32 maxPendingMessagesAcrossPartitions = 2; > >> > > >> > bool useThreadLocalProducers = 3; > >> > > >> > CryptoSpec cryptoSpec = 4; > >> > > >> > string batchBuilder = 5; > >> > > >> > bool batchingDisabled = 6; > >> > > >> > bool chunkingEnabled = 7; > >> > > >> > bool blockIfQueueFullDisabled = 8; > >> > > >> > CompressionType compressionType = 9; > >> > > >> > HashingScheme hashingScheme = 10; > >> > > >> > MessageRoutingMode messageRoutingMode = 11; > >> > > >> > int64 batchingMaxPublishDelay = 12; > >> > > >> > } > >> > > >> > > >> > > >> > To support these new parameters during local run and for convenience, > we > >> > should be able to specify them in the Admin CLI and in local run > modes. > >> > > >> > For local run, as function_worker.yml is not used, these parameters > will > >> > have defaults and be primitive values: > >> > > >> > @Parameter(names = "--clusterFunctionBatchingDisabled", description = > >> > "Disable > >> > the default message batching behavior for functions", hidden = true) > >> > > >> > public boolean clusterFunctionBatchingDisabled = false; > >> > > >> > > >> > > >> > @Parameter(names = "--clusterFunctionChunkingEnabled", description = > >> "The > >> > default message chunking behavior for functions", hidden = true) > >> > > >> > public boolean clusterFunctionChunkingEnabled = false; > >> > > >> > > >> > > >> > @Parameter(names = "--clusterFunctionBlockIfQueueFullDisabled", > >> description > >> > = "Disable the default blocking behavior for functions when queue is > >> > full", hidden > >> > = true) > >> > > >> > public boolean clusterFunctionBlockIfQueueFullDisabled = false; > >> > > >> > > >> > > >> > @Parameter(names = "--clusterFunctionCompressionTypeDefault", > >> > description = "The > >> > default Compression Type for functions", hidden = true) > >> > > >> > public String clusterFunctionCompressionTypeDefault = "LZ4"; > >> > > >> > > >> > > >> > @Parameter(names = "--clusterFunctionHashingSchemeDefault", > description > >> = > >> > "The > >> > default Hashing Scheme for functions", hidden = true) > >> > > >> > public String clusterFunctionHashingSchemeDefault = "Murmur3_32Hash"; > >> > > >> > > >> > > >> > @Parameter(names = "--clusterFunctionMessageRoutingModeDefault", > >> > description > >> > = "The default Message Routing Mode for functions", hidden = true) > >> > > >> > public String clusterFunctionMessageRoutingModeDefault = > >> "CustomPartition"; > >> > > >> > > >> > > >> > @Parameter(names = "--clusterFunctionBatchingMaxPublishDelayDefault", > >> > description > >> > = "The default max publish delay (in milliseconds) for functions when > >> > message batching is enabled", hidden = true) > >> > > >> > public long clusterFunctionBatchingMaxPublishDelayDefault = 10L; > >> > > >> > > >> > > >> > > >> > > >> > For Admin CLI, parameters can be provided in a similar way, but they > >> will > >> > not have defaults (i.e. Boolean instead of boolean) since parameters > >> should > >> > be null to ensure non-provided configs are obtained from WorkerConfig. > >> > > >> > > >> > > >> > To handle the logic of selecting the property from an incoming > >> > producerConfig vs the producerDefaults object (from WorkerConfig), we > >> can > >> > use a FunctionDefaultsMediator to handle this. That interface looks > like > >> > this and gives us flexibility to handle mediation for specific > >> properties > >> > without needing to convert the entire object. > >> > > >> > > >> > > >> > public interface FunctionDefaultsMediator { > >> > > >> > boolean isBatchingDisabled(); > >> > > >> > boolean isChunkingEnabled(); > >> > > >> > boolean isBlockIfQueueFullDisabled(); > >> > > >> > CompressionType getCompressionType(); > >> > > >> > Function.CompressionType getCompressionTypeProto(); > >> > > >> > HashingScheme getHashingScheme(); > >> > > >> > Function.HashingScheme getHashingSchemeProto(); > >> > > >> > MessageRoutingMode getMessageRoutingMode(); > >> > > >> > Function.MessageRoutingMode getMessageRoutingModeProto(); > >> > > >> > Long getBatchingMaxPublishDelay(); > >> > > >> > } > >> > > >> > > >> > > >> > (Alternatively, we could remove the Proto methods from the mediator > and > >> > just use FunctionConfigUtils.convert(..) to translate between the > >> protobuf > >> > ProducerSpec and the Java ProducerConfig.) > >> > > >> > > >> > > >> > > >> > *Testing* > >> > > >> > In terms of testing, many of this functionality will be covered by > >> existing > >> > tests in the pulsar-functions module. However, we need to test each of > >> the > >> > new classes, as well as these behaviors: > >> > > >> > - functionRegistration (with different provided ProducerConfig > >> > properties) > >> > - functionUpdate (for ignoreExistingFunctionDefaults=true and > >> > ignoreExistingFunctionDefaults=false) > >> > - loading function_worker.yml into WorkerConfig > >> > - throwing exceptions correctly when invalid parameters are > provided > >> > - converting between ProducerSpec and ProducerConfig > >> > - correctly handling precedence when computing the final > >> ProducerConfig > >> > values > >> > - creating producers > >> > - ensuring functionality is consistent across runtimes > >> > > >> > > >> > > >> > I'd like feedback on this proposal. > >> > > >> > Devin G. Bost > >> > > >> > > >