> > The interceptor would be loaded at function registration.
The interceptor classes should be loaded at broker/worker startup time. In order for an > interceptor to be picked up, a jar would need to be compiled with the class > that implements the interface, and the jar would need to be deployed into a > directory that matches the directory provided somewhere. (Would this > directory be specified in function_worker.yml? Or, would we need to reuse > one of the existing interceptor directories and filter classes at > runtime?) > The JAR with classes for the interceptor can just be placed in the lib/ directory of the pulsar project. An additional config may be needed to be added to function_worker.yml so the classname of the interceptor impl to be provided. In order for changes to the FunctionConfig values to reach where the > Function producers are created, we'd still need to add the producer > properties to the Function.proto and FunctionConfig.ProducerConfig > definitions. We'd also still need to modify CLI parameters to ensure these > values could be modified on a per-function basis. > For you use case of disabling batching, you will still need to add an additional field in the Function.proto so that it can be configurable. I would recommend you doing that work separately since that work is orthogonal to interceptors. When a function is registered, instead of retrieving cluster-wide function > producer defaults from WorkerConfig, it would load the interceptor, which > would modify the FunctionConfig's ProducerConfig's property values to > provide the cluster-wide defaults for any value that wasn't provided by the > REST call. > Yes. If an interceptor wasn't provided, during conversion of the ProducerConfig > to the protobuf ProducerSpec, the protobuf defaults will take effect (since > protobuf doesn't allow null primitives), which will then be picked up when > the producer is created. > If an interceptor impl is not provided, the behavior of the APIs should be the same as the current behavior. Back in the Streamlio, I actually created an interceptor mechanism. Please take at this old PR: https://github.com/apache/pulsar/pull/7159 or more specifically what is done for functions here. https://github.com/apache/pulsar/pull/7159/files#diff-427711b7c1c3b9f17cd1c02bb9ceb942af454676dd97902a43e910f645febc6d Perhaps you can help revive this work! Best, Jerry On Mon, Apr 5, 2021 at 11:46 AM Devin Bost <devin.b...@gmail.com> wrote: > Here's my understanding of what I'd need to do to implement the interceptor > approach. > The interceptor would be loaded at function registration. In order for an > interceptor to be picked up, a jar would need to be compiled with the class > that implements the interface, and the jar would need to be deployed into a > directory that matches the directory provided somewhere. (Would this > directory be specified in function_worker.yml? Or, would we need to reuse > one of the existing interceptor directories and filter classes at > runtime?) > In order for changes to the FunctionConfig values to reach where the > Function producers are created, we'd still need to add the producer > properties to the Function.proto and FunctionConfig.ProducerConfig > definitions. We'd also still need to modify CLI parameters to ensure these > values could be modified on a per-function basis. > When a function is registered, instead of retrieving cluster-wide function > producer defaults from WorkerConfig, it would load the interceptor, which > would modify the FunctionConfig's ProducerConfig's property values to > provide the cluster-wide defaults for any value that wasn't provided by the > REST call. > If an interceptor wasn't provided, during conversion of the ProducerConfig > to the protobuf ProducerSpec, the protobuf defaults will take effect (since > protobuf doesn't allow null primitives), which will then be picked up when > the producer is created. > > Is that right? > > Devin G. Bost > > > On Mon, Apr 5, 2021 at 11:56 AM Devin Bost <devin.b...@gmail.com> wrote: > > > If we use the interceptor approach, does that mean a cluster-admin would > > need to write custom code in order to override the function producer > > settings? I worry about adding additional burden to cluster > administration. > > > > -- > > Devin G. Bost > > > > On Sat, Apr 3, 2021, 8:26 AM Devin Bost <devin.b...@gmail.com> wrote: > > > >> Hi Rui, > >> > >> Thanks for the feedback. My understanding about the interceptor is that > >> it's not something that would be touched by users but would only be > >> modified by cluster admins. So, I think we'd still need to include the > >> parameters the parameters on the ProducerConfig. > >> I already have the other approach coded, so changing the design will > >> require some rework. > >> I'll look at that example you mentioned. > >> > >> -- > >> Devin G. Bost > >> > >> On Fri, Apr 2, 2021, 10:18 PM Rui Fu <f...@rui.sh> wrote: > >> > >>> Hi Devin, > >>> > >>> Great proposal for giving function such useful feature, and thanks for > >>> your detailed writing. After going through the history of this > discussion, > >>> I would like to have +1 with Jerry’s suggestion. With the interceptor, > the > >>> admins and users may have the maximum flexibility to customize their > >>> producer configs. It may also reduce the parameters we put into the > configs > >>> and pulsar-admin. Also, we have some interceptors with Pulsar Functions > >>> already, like the `RuntimeCustomizer` interface, so it is not a new > tech > >>> that users hard to adopt. > >>> > >>> Besides, I would like to suggest that to cover these changes with > Python > >>> & Go runtime as well if we will have some new fields to > `Function.proto`. > >>> > >>> Best, > >>> > >>> Rui > >>> 在 2021年4月2日 +0800 AM10:13,Devin Bost <devin.b...@gmail.com>,写道: > >>> > What would be some of the additional benefits of using the > interceptor > >>> > approach? > >>> > > >>> > -- > >>> > Devin G. Bost > >>> > > >>> > On Thu, Apr 1, 2021, 6:03 PM Jerry Peng <jerry.boyang.p...@gmail.com > > > >>> wrote: > >>> > > >>> > > Devin, > >>> > > > >>> > > Interceptors are located on the server side (broker/worker). > Cluster > >>> > > admins would create the plugins based on the interfaces I described > >>> and > >>> > > install them on the server side. Regular function developers will > not > >>> > > implement or interact directly with the interceptor. > >>> > > > >>> > > > Would it be better to just ignore WorkerConfig defaults during > >>> function > >>> > > update? > >>> > > > >>> > > Yes. > >>> > > > >>> > > If the cluster admin changes the function config defaults and wants > >>> > > existing functions to utilize those configs that have changed, the > >>> admin > >>> > > can just update those configs of the functions through the regular > >>> update > >>> > > mechanism for functions we have today. > >>> > > > >>> > > > >>> > > > >>> > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <devin.b...@gmail.com> > >>> wrote: > >>> > > > >>> > > > Would it be better to just ignore WorkerConfig defaults during > >>> function > >>> > > > update? We could still update the function producer behavior > >>> through the > >>> > > > producerConfig passed as a REST parameter, but we'd avoid the > edge > >>> case I > >>> > > > mentioned. > >>> > > > > >>> > > > Devin G. Bost > >>> > > > > >>> > > > > >>> > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com> > >>> wrote: > >>> > > > > >>> > > > > I just remembered an edge case that motivated me to create that > >>> > > > additional > >>> > > > > flow. > >>> > > > > In my PR, if any producerConfig values are null at registration > >>> or > >>> > > > update, > >>> > > > > we get those values from WorkerConfig instead. > >>> > > > > However, when a user runs an update (on a function) for the > >>> first time > >>> > > > > after upgrading to the version of Pulsar with this feature, if > >>> someone > >>> > > > has > >>> > > > > modified the function_worker.yml file (perhaps without their > >>> > > knowledge), > >>> > > > > those defaults from function_worker.yml will get picked up > >>> because the > >>> > > > > existing producerConfig will have null set for those values > >>> (because > >>> > > the > >>> > > > > function was registered prior to the existence of this > feature.) > >>> So, > >>> > > > > the user could be trying to update something unrelated, not > >>> realizing > >>> > > > that > >>> > > > > their function is going to have its producerConfig updated by > the > >>> > > cluster > >>> > > > > defaults. Most users won't be affected by this, but it could be > >>> tricky > >>> > > to > >>> > > > > diagnose for the few who would get impacted by it. (Is this > >>> description > >>> > > > any > >>> > > > > clearer? If not, it might be clearer if I diagram it.) > >>> > > > > > >>> > > > > Devin G. Bost > >>> > > > > > >>> > > > > > >>> > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng < > >>> jerry.boyang.p...@gmail.com > >>> > > > > >>> > > > > wrote: > >>> > > > > > >>> > > > > > Hi Devin, > >>> > > > > > > >>> > > > > > I understand the usefulness of giving cluster admins the > >>> ability to > >>> > > > > > specify > >>> > > > > > some defaults for functions at a cluster level. I think the > >>> right > >>> > > time > >>> > > > to > >>> > > > > > apply those defaults is during function registration time. I > >>> am not > >>> > > > sure > >>> > > > > > I > >>> > > > > > understand the usefulness of what you are proposing we do > >>> during > >>> > > update > >>> > > > > > time. If you would like to change a config of an already > >>> running > >>> > > > function > >>> > > > > > why not just update it with config changes that are needed. I > >>> am not > >>> > > > sure > >>> > > > > > you need the additional workflow you proposed for the update. > >>> > > > > > > >>> > > > > > Another idea to satisfy your use case is that instead of > >>> leveraging a > >>> > > > > > default config mechanism perhaps we should take a look at > >>> implementing > >>> > > > > > interceptors for the function/sources/sink API calls. We > >>> already have > >>> > > > > > interceptors implemented for many other pulsar > functionalities. > >>> > > Perhaps > >>> > > > we > >>> > > > > > should do it for Pulsar Functions APIs as well. With > >>> interceptors you > >>> > > > > > will > >>> > > > > > be able to intercept all API operations to functions and > >>> customize the > >>> > > > > > requests however you would like. > >>> > > > > > > >>> > > > > > We can have a interface like: > >>> > > > > > > >>> > > > > > interface FunctionInterceptors { > >>> > > > > > > >>> > > > > > void regsterFunctionInterceptor (String tenant, String > >>> namespace, > >>> > > String > >>> > > > > > functionName, FunctionConfig functionConfig...); > >>> > > > > > > >>> > > > > > ... > >>> > > > > > > >>> > > > > > } > >>> > > > > > > >>> > > > > > for developers to implement. During the beginning of every > >>> Function > >>> > > API > >>> > > > > > call the corresponding interceptor method gets called. For > >>> example, > >>> > > the > >>> > > > > > "regsterFunctionInterceptor()" method I provided above will > be > >>> called > >>> > > > > > towards the beginning of the registerFunction and allow you > to > >>> > > customize > >>> > > > > > the function config however you want. > >>> > > > > > > >>> > > > > > Best, > >>> > > > > > > >>> > > > > > Jerry > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > On Thu, Apr 1, 2021 at 2:08 PM Devin Bost < > >>> devin.b...@gmail.com> > >>> > > wrote: > >>> > > > > > > >>> > > > > > > *Cluster-Wide and Function-Specific Producer Defaults* > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > * - Status: Proposal- Author: Devin Bost (with guidance > from > >>> Jerry > >>> > > > > > Peng)- > >>> > > > > > > Pull Request: https://github.com/apache/pulsar/pull/9987 > >>> > > > > > > <https://github.com/apache/pulsar/pull/9987>- Mailing List > >>> > > > discussion: > >>> > > > > > - > >>> > > > > > > Release: 2.8.0* > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > *Motivation* > >>> > > > > > > > >>> > > > > > > Pulsar currently provides no way to allow users or > operators > >>> to > >>> > > change > >>> > > > > > the > >>> > > > > > > producer behavior of Pulsar functions. For organizations > >>> that are > >>> > > > making > >>> > > > > > > heavy use of functions, the ability to tune messaging > >>> behavior of > >>> > > > > > functions > >>> > > > > > > is greatly desired. Moreover, current function messaging > >>> defaults > >>> > > > > > appear to > >>> > > > > > > be causing issues for certain workloads. (For example, some > >>> bugs > >>> > > > appear > >>> > > > > > to > >>> > > > > > > be related to having batching enabled. See #6054.) Enabling > >>> > > operators > >>> > > > to > >>> > > > > > > modify these settings will help Pulsar cluster admins > >>> workaround > >>> > > > issues > >>> > > > > > to > >>> > > > > > > enable them to upgrade legacy versions of Pulsar to more > >>> recent > >>> > > > > > versions. > >>> > > > > > > Moreover, enabling greater tuning of these settings can > >>> provide more > >>> > > > > > > opportunity for performance optimization in production > >>> environments. > >>> > > > We > >>> > > > > > > need the ability to modify these function producer > settings: > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > - batchingEnabled > >>> > > > > > > - chunkingEnabled > >>> > > > > > > - blockIfQueueFull > >>> > > > > > > - compressionType > >>> > > > > > > - hashingScheme > >>> > > > > > > - messageRoutingMode > >>> > > > > > > - batchingMaxPublishDelay > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > *Implementation Strategy* > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > Ideally, we need a way to set default message producer > >>> behavior (for > >>> > > > > > > functions) on a cluster-wide level. However, operators may > >>> want > >>> > > > specific > >>> > > > > > > settings to be used for certain functions. (We can't assume > >>> that > >>> > > > > > everyone > >>> > > > > > > will want all functions in a cluster to have the same > >>> producer > >>> > > > > > settings.) > >>> > > > > > > When an operator changes the cluster-wide producer > defaults, > >>> they > >>> > > > need a > >>> > > > > > > way to be able to roll out the changes to functions without > >>> > > > > > significantly > >>> > > > > > > disrupting messaging flows in production environments. They > >>> also > >>> > > need > >>> > > > a > >>> > > > > > > simple way to rollback changes in case a change to function > >>> producer > >>> > > > > > > settings results in undesired behavior. However, not all > >>> users will > >>> > > > want > >>> > > > > > > function updates to replace existing function producer > >>> settings, > >>> > > > > > especially > >>> > > > > > > for functions that have been given custom producer settings > >>> (at the > >>> > > > > > > per-function level.) Due to this difference in use cases, > >>> there > >>> > > needs > >>> > > > > > to be > >>> > > > > > > a way to allow an operator to specify what update behavior > >>> they > >>> > > want. > >>> > > > > > For a > >>> > > > > > > given update, the user should be able to specify if: > >>> > > > > > > > >>> > > > > > > 1. Existing function producer settings will be replaced by > >>> > > > > > cluster-wide > >>> > > > > > > defaults (unless overridden by producer settings provided > in > >>> the > >>> > > > > > > request), > >>> > > > > > > or > >>> > > > > > > 2. Existing function producer settings will *not* be > >>> replaced by > >>> > > > > > > cluster-wide defaults (unless overridden by producer > settings > >>> > > > > > provided > >>> > > > > > > in > >>> > > > > > > the request). > >>> > > > > > > > >>> > > > > > > So, the two orders of precedence are: > >>> > > > > > > > >>> > > > > > > 1. newProducerConfig > cluster-wide defaults > > >>> > > > existingProducerConfig > >>> > > > > > > 2. newProducerConfig > existingProducerConfig > > cluster-wide > >>> > > > defaults > >>> > > > > > > > >>> > > > > > > We can add a boolean property to UpdateOptions to allow the > >>> > > precedence > >>> > > > > > to > >>> > > > > > > be specified. Since we don't want to introduce unexpected > >>> side > >>> > > effects > >>> > > > > > > during an update, the default precedence should prefer the > >>> > > > > > > existingProducerConfig over cluster-wide defaults. > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > To ensure that the desired producer settings (after > >>> calculating the > >>> > > > > > > settings from defaults + overrides) are available when > >>> creating the > >>> > > > > > > producers (on any cluster), we need to add these producer > >>> settings > >>> > > to > >>> > > > > > the > >>> > > > > > > function protobuf definition. Also, to ensure that settings > >>> can be > >>> > > > > > properly > >>> > > > > > > validated at the time they are submitted, we need > validation > >>> in two > >>> > > > > > places: > >>> > > > > > > > >>> > > > > > > 1. When cluster-wide configs are loaded upon broker startup > >>> > > > > > > 2. When a function is registered or updated. > >>> > > > > > > > >>> > > > > > > As it would be impractical to check every registered > function > >>> > > > definition > >>> > > > > > > during broker startup, most of the validation needs to > occur > >>> during > >>> > > > > > > function registration or update. However, during broker > >>> startup, we > >>> > > > can > >>> > > > > > at > >>> > > > > > > least validate that the configurations exist and throw an > >>> exception > >>> > > if > >>> > > > > > they > >>> > > > > > > are invalid. > >>> > > > > > > > >>> > > > > > > As any configuration exceptions thrown when the producer is > >>> created > >>> > > > > > would > >>> > > > > > > bubble up to the client, we need to ensure that the final > >>> > > > configurations > >>> > > > > > > are valid when a function is registered or updated to > ensure > >>> there > >>> > > are > >>> > > > > > > exceptions when the producer is created. This requires > >>> computing the > >>> > > > > > > producer settings (according to the desired precedence) > >>> during > >>> > > > function > >>> > > > > > > registration or update. > >>> > > > > > > > >>> > > > > > > When a function is registered, existing producer settings > do > >>> not > >>> > > exist > >>> > > > > > for > >>> > > > > > > it, so we simply need to get cluster-wide defaults and > >>> override them > >>> > > > > > with > >>> > > > > > > any specific producer settings provided in the > >>> FunctionConfig's > >>> > > > > > > ProducerConfig (submitted as a REST parameter.) After the > >>> initial > >>> > > > > > > registration, those computed settings will then be > persisted > >>> in > >>> > > > > > bookkeeper > >>> > > > > > > with the function definition. > >>> > > > > > > > >>> > > > > > > When a function is updated, we also need to address the > >>> existing > >>> > > > > > > producerConfig, and for this, we need to check the desired > >>> override > >>> > > > > > > precedence (as explained above) when computing the final > >>> producer > >>> > > > > > settings > >>> > > > > > > that will be persisted with the function. > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > *Modifications* > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > As WorkerConfig is readily available when functions are > >>> registered > >>> > > and > >>> > > > > > > updated, we can put new settings into function_worker.yml > >>> and can > >>> > > load > >>> > > > > > and > >>> > > > > > > validate them via WorkerConfig.load(..). To prevent > >>> introducing > >>> > > > breaking > >>> > > > > > > changes, parameters will have defaults assigned in > >>> WorkerConfig that > >>> > > > > > > conform to current expected behavior, like this: > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @Data > >>> > > > > > > > >>> > > > > > > @Accessors(chain = true) > >>> > > > > > > > >>> > > > > > > public class FunctionDefaultsConfig implements > Serializable { > >>> > > > > > > > >>> > > > > > > @FieldContext( > >>> > > > > > > > >>> > > > > > > doc = "Disables default message batching between > >>> > > > functions" > >>> > > > > > > > >>> > > > > > > ) > >>> > > > > > > > >>> > > > > > > protected boolean batchingDisabled = false; > >>> > > > > > > > >>> > > > > > > @FieldContext( > >>> > > > > > > > >>> > > > > > > doc = "Enables default message chunking between > >>> > > functions" > >>> > > > > > > > >>> > > > > > > ) > >>> > > > > > > > >>> > > > > > > protected boolean chunkingEnabled = false; > >>> > > > > > > > >>> > > > > > > @FieldContext( > >>> > > > > > > > >>> > > > > > > doc = "Disables default behavior to block when message > >>> > > > > > queue is > >>> > > > > > > full" > >>> > > > > > > > >>> > > > > > > ) > >>> > > > > > > > >>> > > > > > > protected boolean blockIfQueueFullDisabled = false; > >>> > > > > > > > >>> > > > > > > @FieldContext( > >>> > > > > > > > >>> > > > > > > doc = "Default compression type" > >>> > > > > > > > >>> > > > > > > ) > >>> > > > > > > > >>> > > > > > > protected String compressionType = "LZ4"; > >>> > > > > > > > >>> > > > > > > @FieldContext( > >>> > > > > > > > >>> > > > > > > doc = "Default hashing scheme" > >>> > > > > > > > >>> > > > > > > ) > >>> > > > > > > > >>> > > > > > > protected String hashingScheme = "Murmur3_32Hash"; > >>> > > > > > > > >>> > > > > > > @FieldContext( > >>> > > > > > > > >>> > > > > > > doc = "Default hashing scheme" > >>> > > > > > > > >>> > > > > > > ) > >>> > > > > > > > >>> > > > > > > protected String messageRoutingMode = "CustomPartition"; > >>> > > > > > > > >>> > > > > > > @FieldContext( > >>> > > > > > > > >>> > > > > > > doc = "Default max publish delay (in milliseconds) when > >>> > > > > > message > >>> > > > > > > batching is enabled" > >>> > > > > > > > >>> > > > > > > ) > >>> > > > > > > > >>> > > > > > > protected long batchingMaxPublishDelay = 10L; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > public ClusterFunctionProducerDefaults > >>> buildProducerDefaults() > >>> > > > > > > throws InvalidWorkerConfigDefaultException > >>> > > > > > > { > >>> > > > > > > > >>> > > > > > > return new > >>> > > > > > > ClusterFunctionProducerDefaults(this.isBatchingDisabled() > >>> > > > > > > , > >>> > > > > > > > >>> > > > > > > this.isChunkingEnabled(), > >>> > > > > > > this.isBlockIfQueueFullDisabled(), > >>> > > > > > > this.getCompressionType(), > >>> > > > > > > > >>> > > > > > > this.getHashingScheme(), > >>> > > this.getMessageRoutingMode(), > >>> > > > > > this > >>> > > > > > > .getBatchingMaxPublishDelay()); > >>> > > > > > > > >>> > > > > > > } > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > public FunctionDefaultsConfig() { > >>> > > > > > > > >>> > > > > > > } > >>> > > > > > > > >>> > > > > > > } > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > The additional section in function_worker.yml will look > like > >>> this: > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > functionDefaults: > >>> > > > > > > > >>> > > > > > > batchingDisabled: true > >>> > > > > > > > >>> > > > > > > chunkingEnabled: false > >>> > > > > > > > >>> > > > > > > blockIfQueueFullDisabled: false > >>> > > > > > > > >>> > > > > > > batchingMaxPublishDelay: 12 > >>> > > > > > > > >>> > > > > > > compressionType: ZLIB > >>> > > > > > > > >>> > > > > > > hashingScheme: JavaStringHash > >>> > > > > > > > >>> > > > > > > messageRoutingMode: RoundRobinPartition > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > During function update, we can use a new boolean option, > >>> > > > > > > ignoreExistingFunctionDefaults, to toggle precedence: > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @Data > >>> > > > > > > > >>> > > > > > > @NoArgsConstructor > >>> > > > > > > > >>> > > > > > > @ApiModel(value = "UpdateOptions", description = "Options > >>> while > >>> > > > updating > >>> > > > > > > function defaults or the sink") > >>> > > > > > > > >>> > > > > > > public class UpdateOptions { > >>> > > > > > > > >>> > > > > > > @ApiModelProperty( > >>> > > > > > > > >>> > > > > > > value = "Whether or not to update the auth data", > >>> > > > > > > > >>> > > > > > > name = "update-auth-data") > >>> > > > > > > > >>> > > > > > > private boolean updateAuthData = false; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @ApiModelProperty( > >>> > > > > > > > >>> > > > > > > value="Whether or not to ignore any existing function > >>> > > > > > > defaults", > >>> > > > > > > > >>> > > > > > > name ="ignore-existing-function-defaults") > >>> > > > > > > > >>> > > > > > > private boolean ignoreExistingFunctionDefaults = false; > >>> > > > > > > > >>> > > > > > > } > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > To ensure that function producer settings don't get > modified > >>> > > > > > unexpectedly > >>> > > > > > > by an update, ignoreExistingFunctionDefaults = false by > >>> default. > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > The updated ProducerConfig will contain the new properties > >>> and a > >>> > > > method > >>> > > > > > to > >>> > > > > > > merge an incoming ProducerConfig with an existing > >>> ProducerConfig. > >>> > > (The > >>> > > > > > > merged ProducerConfig will use the incoming ProducerConfig > >>> > > properties > >>> > > > > > when > >>> > > > > > > they exist and will use existing ProducerConfig properties > >>> > > otherwise.) > >>> > > > > > To > >>> > > > > > > ensure we don't need to force operators to provide those > >>> > > > ProducerConfig > >>> > > > > > > properties with every create or update, they must be > >>> nullable, not > >>> > > > > > > primitive (e.g. Boolean instead of boolean.) > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > To support the additional properties, FunctionConfigUtils > >>> (and any > >>> > > > > > similar > >>> > > > > > > classes) will need to be modified to correctly translate > >>> between the > >>> > > > > > > ProducerConfig and the protobuf ProducerSpec. > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > The updated ProducerSpec in Function.proto will look like > >>> this: > >>> > > > > > > > >>> > > > > > > message ProducerSpec { > >>> > > > > > > > >>> > > > > > > int32 maxPendingMessages = 1; > >>> > > > > > > > >>> > > > > > > int32 maxPendingMessagesAcrossPartitions = 2; > >>> > > > > > > > >>> > > > > > > bool useThreadLocalProducers = 3; > >>> > > > > > > > >>> > > > > > > CryptoSpec cryptoSpec = 4; > >>> > > > > > > > >>> > > > > > > string batchBuilder = 5; > >>> > > > > > > > >>> > > > > > > bool batchingDisabled = 6; > >>> > > > > > > > >>> > > > > > > bool chunkingEnabled = 7; > >>> > > > > > > > >>> > > > > > > bool blockIfQueueFullDisabled = 8; > >>> > > > > > > > >>> > > > > > > CompressionType compressionType = 9; > >>> > > > > > > > >>> > > > > > > HashingScheme hashingScheme = 10; > >>> > > > > > > > >>> > > > > > > MessageRoutingMode messageRoutingMode = 11; > >>> > > > > > > > >>> > > > > > > int64 batchingMaxPublishDelay = 12; > >>> > > > > > > > >>> > > > > > > } > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > To support these new parameters during local run and for > >>> > > convenience, > >>> > > > we > >>> > > > > > > should be able to specify them in the Admin CLI and in > local > >>> run > >>> > > > modes. > >>> > > > > > > > >>> > > > > > > For local run, as function_worker.yml is not used, these > >>> parameters > >>> > > > will > >>> > > > > > > have defaults and be primitive values: > >>> > > > > > > > >>> > > > > > > @Parameter(names = "--clusterFunctionBatchingDisabled", > >>> description > >>> > > = > >>> > > > > > > "Disable > >>> > > > > > > the default message batching behavior for functions", > hidden > >>> = true) > >>> > > > > > > > >>> > > > > > > public boolean clusterFunctionBatchingDisabled = false; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @Parameter(names = "--clusterFunctionChunkingEnabled", > >>> description = > >>> > > > > > "The > >>> > > > > > > default message chunking behavior for functions", hidden = > >>> true) > >>> > > > > > > > >>> > > > > > > public boolean clusterFunctionChunkingEnabled = false; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @Parameter(names = > >>> "--clusterFunctionBlockIfQueueFullDisabled", > >>> > > > > > description > >>> > > > > > > = "Disable the default blocking behavior for functions when > >>> queue is > >>> > > > > > > full", hidden > >>> > > > > > > = true) > >>> > > > > > > > >>> > > > > > > public boolean clusterFunctionBlockIfQueueFullDisabled = > >>> false; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @Parameter(names = > "--clusterFunctionCompressionTypeDefault", > >>> > > > > > > description = "The > >>> > > > > > > default Compression Type for functions", hidden = true) > >>> > > > > > > > >>> > > > > > > public String clusterFunctionCompressionTypeDefault = > "LZ4"; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @Parameter(names = "--clusterFunctionHashingSchemeDefault", > >>> > > > description > >>> > > > > > = > >>> > > > > > > "The > >>> > > > > > > default Hashing Scheme for functions", hidden = true) > >>> > > > > > > > >>> > > > > > > public String clusterFunctionHashingSchemeDefault = > >>> > > "Murmur3_32Hash"; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @Parameter(names = > >>> "--clusterFunctionMessageRoutingModeDefault", > >>> > > > > > > description > >>> > > > > > > = "The default Message Routing Mode for functions", hidden > = > >>> true) > >>> > > > > > > > >>> > > > > > > public String clusterFunctionMessageRoutingModeDefault = > >>> > > > > > "CustomPartition"; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > @Parameter(names = > >>> > > "--clusterFunctionBatchingMaxPublishDelayDefault", > >>> > > > > > > description > >>> > > > > > > = "The default max publish delay (in milliseconds) for > >>> functions > >>> > > when > >>> > > > > > > message batching is enabled", hidden = true) > >>> > > > > > > > >>> > > > > > > public long clusterFunctionBatchingMaxPublishDelayDefault = > >>> 10L; > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > For Admin CLI, parameters can be provided in a similar way, > >>> but they > >>> > > > > > will > >>> > > > > > > not have defaults (i.e. Boolean instead of boolean) since > >>> parameters > >>> > > > > > should > >>> > > > > > > be null to ensure non-provided configs are obtained from > >>> > > WorkerConfig. > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > To handle the logic of selecting the property from an > >>> incoming > >>> > > > > > > producerConfig vs the producerDefaults object (from > >>> WorkerConfig), > >>> > > we > >>> > > > > > can > >>> > > > > > > use a FunctionDefaultsMediator to handle this. That > >>> interface looks > >>> > > > like > >>> > > > > > > this and gives us flexibility to handle mediation for > >>> specific > >>> > > > > > properties > >>> > > > > > > without needing to convert the entire object. > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > public interface FunctionDefaultsMediator { > >>> > > > > > > > >>> > > > > > > boolean isBatchingDisabled(); > >>> > > > > > > > >>> > > > > > > boolean isChunkingEnabled(); > >>> > > > > > > > >>> > > > > > > boolean isBlockIfQueueFullDisabled(); > >>> > > > > > > > >>> > > > > > > CompressionType getCompressionType(); > >>> > > > > > > > >>> > > > > > > Function.CompressionType getCompressionTypeProto(); > >>> > > > > > > > >>> > > > > > > HashingScheme getHashingScheme(); > >>> > > > > > > > >>> > > > > > > Function.HashingScheme getHashingSchemeProto(); > >>> > > > > > > > >>> > > > > > > MessageRoutingMode getMessageRoutingMode(); > >>> > > > > > > > >>> > > > > > > Function.MessageRoutingMode getMessageRoutingModeProto(); > >>> > > > > > > > >>> > > > > > > Long getBatchingMaxPublishDelay(); > >>> > > > > > > > >>> > > > > > > } > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > (Alternatively, we could remove the Proto methods from the > >>> mediator > >>> > > > and > >>> > > > > > > just use FunctionConfigUtils.convert(..) to translate > >>> between the > >>> > > > > > protobuf > >>> > > > > > > ProducerSpec and the Java ProducerConfig.) > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > *Testing* > >>> > > > > > > > >>> > > > > > > In terms of testing, many of this functionality will be > >>> covered by > >>> > > > > > existing > >>> > > > > > > tests in the pulsar-functions module. However, we need to > >>> test each > >>> > > of > >>> > > > > > the > >>> > > > > > > new classes, as well as these behaviors: > >>> > > > > > > > >>> > > > > > > - functionRegistration (with different provided > >>> ProducerConfig > >>> > > > > > > properties) > >>> > > > > > > - functionUpdate (for ignoreExistingFunctionDefaults=true > and > >>> > > > > > > ignoreExistingFunctionDefaults=false) > >>> > > > > > > - loading function_worker.yml into WorkerConfig > >>> > > > > > > - throwing exceptions correctly when invalid parameters are > >>> > > > provided > >>> > > > > > > - converting between ProducerSpec and ProducerConfig > >>> > > > > > > - correctly handling precedence when computing the final > >>> > > > > > ProducerConfig > >>> > > > > > > values > >>> > > > > > > - creating producers > >>> > > > > > > - ensuring functionality is consistent across runtimes > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > I'd like feedback on this proposal. > >>> > > > > > > > >>> > > > > > > Devin G. Bost > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > >> >