Would it be better to just ignore WorkerConfig defaults during function update? We could still update the function producer behavior through the producerConfig passed as a REST parameter, but we'd avoid the edge case I mentioned.
Devin G. Bost On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com> wrote: > I just remembered an edge case that motivated me to create that additional > flow. > In my PR, if any producerConfig values are null at registration or update, > we get those values from WorkerConfig instead. > However, when a user runs an update (on a function) for the first time > after upgrading to the version of Pulsar with this feature, if someone has > modified the function_worker.yml file (perhaps without their knowledge), > those defaults from function_worker.yml will get picked up because the > existing producerConfig will have null set for those values (because the > function was registered prior to the existence of this feature.) So, > the user could be trying to update something unrelated, not realizing that > their function is going to have its producerConfig updated by the cluster > defaults. Most users won't be affected by this, but it could be tricky to > diagnose for the few who would get impacted by it. (Is this description any > clearer? If not, it might be clearer if I diagram it.) > > Devin G. Bost > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng <jerry.boyang.p...@gmail.com> > wrote: > >> Hi Devin, >> >> I understand the usefulness of giving cluster admins the ability to >> specify >> some defaults for functions at a cluster level. I think the right time to >> apply those defaults is during function registration time. I am not sure >> I >> understand the usefulness of what you are proposing we do during update >> time. If you would like to change a config of an already running function >> why not just update it with config changes that are needed. I am not sure >> you need the additional workflow you proposed for the update. >> >> Another idea to satisfy your use case is that instead of leveraging a >> default config mechanism perhaps we should take a look at implementing >> interceptors for the function/sources/sink API calls. We already have >> interceptors implemented for many other pulsar functionalities. Perhaps we >> should do it for Pulsar Functions APIs as well. With interceptors you >> will >> be able to intercept all API operations to functions and customize the >> requests however you would like. >> >> We can have a interface like: >> >> interface FunctionInterceptors { >> >> void regsterFunctionInterceptor (String tenant, String namespace, String >> functionName, FunctionConfig functionConfig...); >> >> ... >> >> } >> >> for developers to implement. During the beginning of every Function API >> call the corresponding interceptor method gets called. For example, the >> "regsterFunctionInterceptor()" method I provided above will be called >> towards the beginning of the registerFunction and allow you to customize >> the function config however you want. >> >> Best, >> >> Jerry >> >> >> >> On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <devin.b...@gmail.com> wrote: >> >> > *Cluster-Wide and Function-Specific Producer Defaults* >> > >> > >> > >> > >> > * - Status: Proposal- Author: Devin Bost (with guidance from Jerry >> Peng)- >> > Pull Request: https://github.com/apache/pulsar/pull/9987 >> > <https://github.com/apache/pulsar/pull/9987>- Mailing List discussion: >> - >> > Release: 2.8.0* >> > >> > >> > *Motivation* >> > >> > Pulsar currently provides no way to allow users or operators to change >> the >> > producer behavior of Pulsar functions. For organizations that are making >> > heavy use of functions, the ability to tune messaging behavior of >> functions >> > is greatly desired. Moreover, current function messaging defaults >> appear to >> > be causing issues for certain workloads. (For example, some bugs appear >> to >> > be related to having batching enabled. See #6054.) Enabling operators to >> > modify these settings will help Pulsar cluster admins workaround issues >> to >> > enable them to upgrade legacy versions of Pulsar to more recent >> versions. >> > Moreover, enabling greater tuning of these settings can provide more >> > opportunity for performance optimization in production environments. We >> > need the ability to modify these function producer settings: >> > >> > >> > >> > - batchingEnabled >> > - chunkingEnabled >> > - blockIfQueueFull >> > - compressionType >> > - hashingScheme >> > - messageRoutingMode >> > - batchingMaxPublishDelay >> > >> > >> > >> > *Implementation Strategy* >> > >> > >> > >> > Ideally, we need a way to set default message producer behavior (for >> > functions) on a cluster-wide level. However, operators may want specific >> > settings to be used for certain functions. (We can't assume that >> everyone >> > will want all functions in a cluster to have the same producer >> settings.) >> > When an operator changes the cluster-wide producer defaults, they need a >> > way to be able to roll out the changes to functions without >> significantly >> > disrupting messaging flows in production environments. They also need a >> > simple way to rollback changes in case a change to function producer >> > settings results in undesired behavior. However, not all users will want >> > function updates to replace existing function producer settings, >> especially >> > for functions that have been given custom producer settings (at the >> > per-function level.) Due to this difference in use cases, there needs >> to be >> > a way to allow an operator to specify what update behavior they want. >> For a >> > given update, the user should be able to specify if: >> > >> > 1. Existing function producer settings will be replaced by >> cluster-wide >> > defaults (unless overridden by producer settings provided in the >> > request), >> > or >> > 2. Existing function producer settings will *not* be replaced by >> > cluster-wide defaults (unless overridden by producer settings >> provided >> > in >> > the request). >> > >> > So, the two orders of precedence are: >> > >> > 1. newProducerConfig > cluster-wide defaults > existingProducerConfig >> > 2. newProducerConfig > existingProducerConfig > cluster-wide defaults >> > >> > We can add a boolean property to UpdateOptions to allow the precedence >> to >> > be specified. Since we don't want to introduce unexpected side effects >> > during an update, the default precedence should prefer the >> > existingProducerConfig over cluster-wide defaults. >> > >> > >> > >> > To ensure that the desired producer settings (after calculating the >> > settings from defaults + overrides) are available when creating the >> > producers (on any cluster), we need to add these producer settings to >> the >> > function protobuf definition. Also, to ensure that settings can be >> properly >> > validated at the time they are submitted, we need validation in two >> places: >> > >> > 1. When cluster-wide configs are loaded upon broker startup >> > 2. When a function is registered or updated. >> > >> > As it would be impractical to check every registered function definition >> > during broker startup, most of the validation needs to occur during >> > function registration or update. However, during broker startup, we can >> at >> > least validate that the configurations exist and throw an exception if >> they >> > are invalid. >> > >> > As any configuration exceptions thrown when the producer is created >> would >> > bubble up to the client, we need to ensure that the final configurations >> > are valid when a function is registered or updated to ensure there are >> > exceptions when the producer is created. This requires computing the >> > producer settings (according to the desired precedence) during function >> > registration or update. >> > >> > When a function is registered, existing producer settings do not exist >> for >> > it, so we simply need to get cluster-wide defaults and override them >> with >> > any specific producer settings provided in the FunctionConfig's >> > ProducerConfig (submitted as a REST parameter.) After the initial >> > registration, those computed settings will then be persisted in >> bookkeeper >> > with the function definition. >> > >> > When a function is updated, we also need to address the existing >> > producerConfig, and for this, we need to check the desired override >> > precedence (as explained above) when computing the final producer >> settings >> > that will be persisted with the function. >> > >> > >> > >> > *Modifications* >> > >> > >> > >> > As WorkerConfig is readily available when functions are registered and >> > updated, we can put new settings into function_worker.yml and can load >> and >> > validate them via WorkerConfig.load(..). To prevent introducing breaking >> > changes, parameters will have defaults assigned in WorkerConfig that >> > conform to current expected behavior, like this: >> > >> > >> > >> > @Data >> > >> > @Accessors(chain = true) >> > >> > public class FunctionDefaultsConfig implements Serializable { >> > >> > @FieldContext( >> > >> > doc = "Disables default message batching between functions" >> > >> > ) >> > >> > protected boolean batchingDisabled = false; >> > >> > @FieldContext( >> > >> > doc = "Enables default message chunking between functions" >> > >> > ) >> > >> > protected boolean chunkingEnabled = false; >> > >> > @FieldContext( >> > >> > doc = "Disables default behavior to block when message >> queue is >> > full" >> > >> > ) >> > >> > protected boolean blockIfQueueFullDisabled = false; >> > >> > @FieldContext( >> > >> > doc = "Default compression type" >> > >> > ) >> > >> > protected String compressionType = "LZ4"; >> > >> > @FieldContext( >> > >> > doc = "Default hashing scheme" >> > >> > ) >> > >> > protected String hashingScheme = "Murmur3_32Hash"; >> > >> > @FieldContext( >> > >> > doc = "Default hashing scheme" >> > >> > ) >> > >> > protected String messageRoutingMode = "CustomPartition"; >> > >> > @FieldContext( >> > >> > doc = "Default max publish delay (in milliseconds) when >> message >> > batching is enabled" >> > >> > ) >> > >> > protected long batchingMaxPublishDelay = 10L; >> > >> > >> > >> > public ClusterFunctionProducerDefaults buildProducerDefaults() >> > throws InvalidWorkerConfigDefaultException >> > { >> > >> > return new >> > ClusterFunctionProducerDefaults(this.isBatchingDisabled() >> > , >> > >> > this.isChunkingEnabled(), >> > this.isBlockIfQueueFullDisabled(), >> > this.getCompressionType(), >> > >> > this.getHashingScheme(), this.getMessageRoutingMode(), >> this >> > .getBatchingMaxPublishDelay()); >> > >> > } >> > >> > >> > >> > public FunctionDefaultsConfig() { >> > >> > } >> > >> > } >> > >> > >> > >> > The additional section in function_worker.yml will look like this: >> > >> > >> > >> > functionDefaults: >> > >> > batchingDisabled: true >> > >> > chunkingEnabled: false >> > >> > blockIfQueueFullDisabled: false >> > >> > batchingMaxPublishDelay: 12 >> > >> > compressionType: ZLIB >> > >> > hashingScheme: JavaStringHash >> > >> > messageRoutingMode: RoundRobinPartition >> > >> > >> > >> > During function update, we can use a new boolean option, >> > ignoreExistingFunctionDefaults, to toggle precedence: >> > >> > >> > >> > @Data >> > >> > @NoArgsConstructor >> > >> > @ApiModel(value = "UpdateOptions", description = "Options while updating >> > function defaults or the sink") >> > >> > public class UpdateOptions { >> > >> > @ApiModelProperty( >> > >> > value = "Whether or not to update the auth data", >> > >> > name = "update-auth-data") >> > >> > private boolean updateAuthData = false; >> > >> > >> > >> > @ApiModelProperty( >> > >> > value="Whether or not to ignore any existing function >> > defaults", >> > >> > name ="ignore-existing-function-defaults") >> > >> > private boolean ignoreExistingFunctionDefaults = false; >> > >> > } >> > >> > >> > >> > To ensure that function producer settings don't get modified >> unexpectedly >> > by an update, ignoreExistingFunctionDefaults = false by default. >> > >> > >> > >> > The updated ProducerConfig will contain the new properties and a method >> to >> > merge an incoming ProducerConfig with an existing ProducerConfig. (The >> > merged ProducerConfig will use the incoming ProducerConfig properties >> when >> > they exist and will use existing ProducerConfig properties otherwise.) >> To >> > ensure we don't need to force operators to provide those ProducerConfig >> > properties with every create or update, they must be nullable, not >> > primitive (e.g. Boolean instead of boolean.) >> > >> > >> > >> > To support the additional properties, FunctionConfigUtils (and any >> similar >> > classes) will need to be modified to correctly translate between the >> > ProducerConfig and the protobuf ProducerSpec. >> > >> > >> > >> > The updated ProducerSpec in Function.proto will look like this: >> > >> > message ProducerSpec { >> > >> > int32 maxPendingMessages = 1; >> > >> > int32 maxPendingMessagesAcrossPartitions = 2; >> > >> > bool useThreadLocalProducers = 3; >> > >> > CryptoSpec cryptoSpec = 4; >> > >> > string batchBuilder = 5; >> > >> > bool batchingDisabled = 6; >> > >> > bool chunkingEnabled = 7; >> > >> > bool blockIfQueueFullDisabled = 8; >> > >> > CompressionType compressionType = 9; >> > >> > HashingScheme hashingScheme = 10; >> > >> > MessageRoutingMode messageRoutingMode = 11; >> > >> > int64 batchingMaxPublishDelay = 12; >> > >> > } >> > >> > >> > >> > To support these new parameters during local run and for convenience, we >> > should be able to specify them in the Admin CLI and in local run modes. >> > >> > For local run, as function_worker.yml is not used, these parameters will >> > have defaults and be primitive values: >> > >> > @Parameter(names = "--clusterFunctionBatchingDisabled", description = >> > "Disable >> > the default message batching behavior for functions", hidden = true) >> > >> > public boolean clusterFunctionBatchingDisabled = false; >> > >> > >> > >> > @Parameter(names = "--clusterFunctionChunkingEnabled", description = >> "The >> > default message chunking behavior for functions", hidden = true) >> > >> > public boolean clusterFunctionChunkingEnabled = false; >> > >> > >> > >> > @Parameter(names = "--clusterFunctionBlockIfQueueFullDisabled", >> description >> > = "Disable the default blocking behavior for functions when queue is >> > full", hidden >> > = true) >> > >> > public boolean clusterFunctionBlockIfQueueFullDisabled = false; >> > >> > >> > >> > @Parameter(names = "--clusterFunctionCompressionTypeDefault", >> > description = "The >> > default Compression Type for functions", hidden = true) >> > >> > public String clusterFunctionCompressionTypeDefault = "LZ4"; >> > >> > >> > >> > @Parameter(names = "--clusterFunctionHashingSchemeDefault", description >> = >> > "The >> > default Hashing Scheme for functions", hidden = true) >> > >> > public String clusterFunctionHashingSchemeDefault = "Murmur3_32Hash"; >> > >> > >> > >> > @Parameter(names = "--clusterFunctionMessageRoutingModeDefault", >> > description >> > = "The default Message Routing Mode for functions", hidden = true) >> > >> > public String clusterFunctionMessageRoutingModeDefault = >> "CustomPartition"; >> > >> > >> > >> > @Parameter(names = "--clusterFunctionBatchingMaxPublishDelayDefault", >> > description >> > = "The default max publish delay (in milliseconds) for functions when >> > message batching is enabled", hidden = true) >> > >> > public long clusterFunctionBatchingMaxPublishDelayDefault = 10L; >> > >> > >> > >> > >> > >> > For Admin CLI, parameters can be provided in a similar way, but they >> will >> > not have defaults (i.e. Boolean instead of boolean) since parameters >> should >> > be null to ensure non-provided configs are obtained from WorkerConfig. >> > >> > >> > >> > To handle the logic of selecting the property from an incoming >> > producerConfig vs the producerDefaults object (from WorkerConfig), we >> can >> > use a FunctionDefaultsMediator to handle this. That interface looks like >> > this and gives us flexibility to handle mediation for specific >> properties >> > without needing to convert the entire object. >> > >> > >> > >> > public interface FunctionDefaultsMediator { >> > >> > boolean isBatchingDisabled(); >> > >> > boolean isChunkingEnabled(); >> > >> > boolean isBlockIfQueueFullDisabled(); >> > >> > CompressionType getCompressionType(); >> > >> > Function.CompressionType getCompressionTypeProto(); >> > >> > HashingScheme getHashingScheme(); >> > >> > Function.HashingScheme getHashingSchemeProto(); >> > >> > MessageRoutingMode getMessageRoutingMode(); >> > >> > Function.MessageRoutingMode getMessageRoutingModeProto(); >> > >> > Long getBatchingMaxPublishDelay(); >> > >> > } >> > >> > >> > >> > (Alternatively, we could remove the Proto methods from the mediator and >> > just use FunctionConfigUtils.convert(..) to translate between the >> protobuf >> > ProducerSpec and the Java ProducerConfig.) >> > >> > >> > >> > >> > *Testing* >> > >> > In terms of testing, many of this functionality will be covered by >> existing >> > tests in the pulsar-functions module. However, we need to test each of >> the >> > new classes, as well as these behaviors: >> > >> > - functionRegistration (with different provided ProducerConfig >> > properties) >> > - functionUpdate (for ignoreExistingFunctionDefaults=true and >> > ignoreExistingFunctionDefaults=false) >> > - loading function_worker.yml into WorkerConfig >> > - throwing exceptions correctly when invalid parameters are provided >> > - converting between ProducerSpec and ProducerConfig >> > - correctly handling precedence when computing the final >> ProducerConfig >> > values >> > - creating producers >> > - ensuring functionality is consistent across runtimes >> > >> > >> > >> > I'd like feedback on this proposal. >> > >> > Devin G. Bost >> > >> >