Jerry, Thanks for the information and for the links! Thanks also for the suggestion to split the PR into two parts. That will make it easier to get some of this work completed in time for the 2.8.0 release.
While working on the first part, I discovered something that needs community attention. I discovered that in order to create the producer (e.g. https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L117), we get the MessageRouter from a singleton that has isBatchingEnabled in its constructor. [image: image.png] The problem is that FunctionResultRouter is a shared instance, so if one function changes the router's value of isBatchingEnabled, it will create a race condition with other functions that share the same router. It appears that the reason the router is a singleton is to ensure that clock behavior is consistent. Unless someone has a better idea, it looks like it might be necessary to refactor isBatchingEnabled ( https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java#L46) from its constructor into a method parameter on the router. However, moving isBatchingEnabled to a method parameter is a breaking change on the client interface: https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageRouter.java#L56 I don't want to introduce breaking changes, but I'm not sure how else to ensure that the message router behaves correctly across function instances ( https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java#L82) when some functions may have batching enabled and some may not. Looking for ideas/feedback. Devin G. Bost On Mon, Apr 5, 2021 at 1:11 PM Jerry Peng <jerry.boyang.p...@gmail.com> wrote: > > > > The interceptor would be loaded at function registration. > > > The interceptor classes should be loaded at broker/worker startup time. > > In order for an > > interceptor to be picked up, a jar would need to be compiled with the > class > > that implements the interface, and the jar would need to be deployed > into a > > directory that matches the directory provided somewhere. (Would this > > directory be specified in function_worker.yml? Or, would we need to reuse > > one of the existing interceptor directories and filter classes at > > runtime?) > > > > The JAR with classes for the interceptor can just be placed in the lib/ > directory of the pulsar project. An additional config may be needed to be > added to function_worker.yml so the classname of the interceptor impl to be > provided. > > In order for changes to the FunctionConfig values to reach where the > > Function producers are created, we'd still need to add the producer > > properties to the Function.proto and FunctionConfig.ProducerConfig > > definitions. We'd also still need to modify CLI parameters to ensure > these > > values could be modified on a per-function basis. > > > > For you use case of disabling batching, you will still need to add an > additional field in the Function.proto so that it can be configurable. I > would recommend you doing that work separately since that work is > orthogonal to interceptors. > > When a function is registered, instead of retrieving cluster-wide function > > producer defaults from WorkerConfig, it would load the interceptor, which > > would modify the FunctionConfig's ProducerConfig's property values to > > provide the cluster-wide defaults for any value that wasn't provided by > the > > REST call. > > > > Yes. > > If an interceptor wasn't provided, during conversion of the ProducerConfig > > to the protobuf ProducerSpec, the protobuf defaults will take effect > (since > > protobuf doesn't allow null primitives), which will then be picked up > when > > the producer is created. > > > > > If an interceptor impl is not provided, the behavior of the APIs should be > the same as the current behavior. > > > Back in the Streamlio, I actually created an interceptor mechanism. > > Please take at this old PR: > > https://github.com/apache/pulsar/pull/7159 > > or more specifically what is done for functions here. > > > https://github.com/apache/pulsar/pull/7159/files#diff-427711b7c1c3b9f17cd1c02bb9ceb942af454676dd97902a43e910f645febc6d > > > Perhaps you can help revive this work! > > Best, > > Jerry > > On Mon, Apr 5, 2021 at 11:46 AM Devin Bost <devin.b...@gmail.com> wrote: > > > Here's my understanding of what I'd need to do to implement the > interceptor > > approach. > > The interceptor would be loaded at function registration. In order for an > > interceptor to be picked up, a jar would need to be compiled with the > class > > that implements the interface, and the jar would need to be deployed > into a > > directory that matches the directory provided somewhere. (Would this > > directory be specified in function_worker.yml? Or, would we need to reuse > > one of the existing interceptor directories and filter classes at > > runtime?) > > In order for changes to the FunctionConfig values to reach where the > > Function producers are created, we'd still need to add the producer > > properties to the Function.proto and FunctionConfig.ProducerConfig > > definitions. We'd also still need to modify CLI parameters to ensure > these > > values could be modified on a per-function basis. > > When a function is registered, instead of retrieving cluster-wide > function > > producer defaults from WorkerConfig, it would load the interceptor, which > > would modify the FunctionConfig's ProducerConfig's property values to > > provide the cluster-wide defaults for any value that wasn't provided by > the > > REST call. > > If an interceptor wasn't provided, during conversion of the > ProducerConfig > > to the protobuf ProducerSpec, the protobuf defaults will take effect > (since > > protobuf doesn't allow null primitives), which will then be picked up > when > > the producer is created. > > > > Is that right? > > > > Devin G. Bost > > > > > > On Mon, Apr 5, 2021 at 11:56 AM Devin Bost <devin.b...@gmail.com> wrote: > > > > > If we use the interceptor approach, does that mean a cluster-admin > would > > > need to write custom code in order to override the function producer > > > settings? I worry about adding additional burden to cluster > > administration. > > > > > > -- > > > Devin G. Bost > > > > > > On Sat, Apr 3, 2021, 8:26 AM Devin Bost <devin.b...@gmail.com> wrote: > > > > > >> Hi Rui, > > >> > > >> Thanks for the feedback. My understanding about the interceptor is > that > > >> it's not something that would be touched by users but would only be > > >> modified by cluster admins. So, I think we'd still need to include the > > >> parameters the parameters on the ProducerConfig. > > >> I already have the other approach coded, so changing the design will > > >> require some rework. > > >> I'll look at that example you mentioned. > > >> > > >> -- > > >> Devin G. Bost > > >> > > >> On Fri, Apr 2, 2021, 10:18 PM Rui Fu <f...@rui.sh> wrote: > > >> > > >>> Hi Devin, > > >>> > > >>> Great proposal for giving function such useful feature, and thanks > for > > >>> your detailed writing. After going through the history of this > > discussion, > > >>> I would like to have +1 with Jerry’s suggestion. With the > interceptor, > > the > > >>> admins and users may have the maximum flexibility to customize their > > >>> producer configs. It may also reduce the parameters we put into the > > configs > > >>> and pulsar-admin. Also, we have some interceptors with Pulsar > Functions > > >>> already, like the `RuntimeCustomizer` interface, so it is not a new > > tech > > >>> that users hard to adopt. > > >>> > > >>> Besides, I would like to suggest that to cover these changes with > > Python > > >>> & Go runtime as well if we will have some new fields to > > `Function.proto`. > > >>> > > >>> Best, > > >>> > > >>> Rui > > >>> 在 2021年4月2日 +0800 AM10:13,Devin Bost <devin.b...@gmail.com>,写道: > > >>> > What would be some of the additional benefits of using the > > interceptor > > >>> > approach? > > >>> > > > >>> > -- > > >>> > Devin G. Bost > > >>> > > > >>> > On Thu, Apr 1, 2021, 6:03 PM Jerry Peng < > jerry.boyang.p...@gmail.com > > > > > >>> wrote: > > >>> > > > >>> > > Devin, > > >>> > > > > >>> > > Interceptors are located on the server side (broker/worker). > > Cluster > > >>> > > admins would create the plugins based on the interfaces I > described > > >>> and > > >>> > > install them on the server side. Regular function developers will > > not > > >>> > > implement or interact directly with the interceptor. > > >>> > > > > >>> > > > Would it be better to just ignore WorkerConfig defaults during > > >>> function > > >>> > > update? > > >>> > > > > >>> > > Yes. > > >>> > > > > >>> > > If the cluster admin changes the function config defaults and > wants > > >>> > > existing functions to utilize those configs that have changed, > the > > >>> admin > > >>> > > can just update those configs of the functions through the > regular > > >>> update > > >>> > > mechanism for functions we have today. > > >>> > > > > >>> > > > > >>> > > > > >>> > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <devin.b...@gmail.com> > > >>> wrote: > > >>> > > > > >>> > > > Would it be better to just ignore WorkerConfig defaults during > > >>> function > > >>> > > > update? We could still update the function producer behavior > > >>> through the > > >>> > > > producerConfig passed as a REST parameter, but we'd avoid the > > edge > > >>> case I > > >>> > > > mentioned. > > >>> > > > > > >>> > > > Devin G. Bost > > >>> > > > > > >>> > > > > > >>> > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost < > devin.b...@gmail.com> > > >>> wrote: > > >>> > > > > > >>> > > > > I just remembered an edge case that motivated me to create > that > > >>> > > > additional > > >>> > > > > flow. > > >>> > > > > In my PR, if any producerConfig values are null at > registration > > >>> or > > >>> > > > update, > > >>> > > > > we get those values from WorkerConfig instead. > > >>> > > > > However, when a user runs an update (on a function) for the > > >>> first time > > >>> > > > > after upgrading to the version of Pulsar with this feature, > if > > >>> someone > > >>> > > > has > > >>> > > > > modified the function_worker.yml file (perhaps without their > > >>> > > knowledge), > > >>> > > > > those defaults from function_worker.yml will get picked up > > >>> because the > > >>> > > > > existing producerConfig will have null set for those values > > >>> (because > > >>> > > the > > >>> > > > > function was registered prior to the existence of this > > feature.) > > >>> So, > > >>> > > > > the user could be trying to update something unrelated, not > > >>> realizing > > >>> > > > that > > >>> > > > > their function is going to have its producerConfig updated by > > the > > >>> > > cluster > > >>> > > > > defaults. Most users won't be affected by this, but it could > be > > >>> tricky > > >>> > > to > > >>> > > > > diagnose for the few who would get impacted by it. (Is this > > >>> description > > >>> > > > any > > >>> > > > > clearer? If not, it might be clearer if I diagram it.) > > >>> > > > > > > >>> > > > > Devin G. Bost > > >>> > > > > > > >>> > > > > > > >>> > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng < > > >>> jerry.boyang.p...@gmail.com > > >>> > > > > > >>> > > > > wrote: > > >>> > > > > > > >>> > > > > > Hi Devin, > > >>> > > > > > > > >>> > > > > > I understand the usefulness of giving cluster admins the > > >>> ability to > > >>> > > > > > specify > > >>> > > > > > some defaults for functions at a cluster level. I think the > > >>> right > > >>> > > time > > >>> > > > to > > >>> > > > > > apply those defaults is during function registration time. > I > > >>> am not > > >>> > > > sure > > >>> > > > > > I > > >>> > > > > > understand the usefulness of what you are proposing we do > > >>> during > > >>> > > update > > >>> > > > > > time. If you would like to change a config of an already > > >>> running > > >>> > > > function > > >>> > > > > > why not just update it with config changes that are > needed. I > > >>> am not > > >>> > > > sure > > >>> > > > > > you need the additional workflow you proposed for the > update. > > >>> > > > > > > > >>> > > > > > Another idea to satisfy your use case is that instead of > > >>> leveraging a > > >>> > > > > > default config mechanism perhaps we should take a look at > > >>> implementing > > >>> > > > > > interceptors for the function/sources/sink API calls. We > > >>> already have > > >>> > > > > > interceptors implemented for many other pulsar > > functionalities. > > >>> > > Perhaps > > >>> > > > we > > >>> > > > > > should do it for Pulsar Functions APIs as well. With > > >>> interceptors you > > >>> > > > > > will > > >>> > > > > > be able to intercept all API operations to functions and > > >>> customize the > > >>> > > > > > requests however you would like. > > >>> > > > > > > > >>> > > > > > We can have a interface like: > > >>> > > > > > > > >>> > > > > > interface FunctionInterceptors { > > >>> > > > > > > > >>> > > > > > void regsterFunctionInterceptor (String tenant, String > > >>> namespace, > > >>> > > String > > >>> > > > > > functionName, FunctionConfig functionConfig...); > > >>> > > > > > > > >>> > > > > > ... > > >>> > > > > > > > >>> > > > > > } > > >>> > > > > > > > >>> > > > > > for developers to implement. During the beginning of every > > >>> Function > > >>> > > API > > >>> > > > > > call the corresponding interceptor method gets called. For > > >>> example, > > >>> > > the > > >>> > > > > > "regsterFunctionInterceptor()" method I provided above will > > be > > >>> called > > >>> > > > > > towards the beginning of the registerFunction and allow you > > to > > >>> > > customize > > >>> > > > > > the function config however you want. > > >>> > > > > > > > >>> > > > > > Best, > > >>> > > > > > > > >>> > > > > > Jerry > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > On Thu, Apr 1, 2021 at 2:08 PM Devin Bost < > > >>> devin.b...@gmail.com> > > >>> > > wrote: > > >>> > > > > > > > >>> > > > > > > *Cluster-Wide and Function-Specific Producer Defaults* > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > * - Status: Proposal- Author: Devin Bost (with guidance > > from > > >>> Jerry > > >>> > > > > > Peng)- > > >>> > > > > > > Pull Request: https://github.com/apache/pulsar/pull/9987 > > >>> > > > > > > <https://github.com/apache/pulsar/pull/9987>- Mailing > List > > >>> > > > discussion: > > >>> > > > > > - > > >>> > > > > > > Release: 2.8.0* > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > *Motivation* > > >>> > > > > > > > > >>> > > > > > > Pulsar currently provides no way to allow users or > > operators > > >>> to > > >>> > > change > > >>> > > > > > the > > >>> > > > > > > producer behavior of Pulsar functions. For organizations > > >>> that are > > >>> > > > making > > >>> > > > > > > heavy use of functions, the ability to tune messaging > > >>> behavior of > > >>> > > > > > functions > > >>> > > > > > > is greatly desired. Moreover, current function messaging > > >>> defaults > > >>> > > > > > appear to > > >>> > > > > > > be causing issues for certain workloads. (For example, > some > > >>> bugs > > >>> > > > appear > > >>> > > > > > to > > >>> > > > > > > be related to having batching enabled. See #6054.) > Enabling > > >>> > > operators > > >>> > > > to > > >>> > > > > > > modify these settings will help Pulsar cluster admins > > >>> workaround > > >>> > > > issues > > >>> > > > > > to > > >>> > > > > > > enable them to upgrade legacy versions of Pulsar to more > > >>> recent > > >>> > > > > > versions. > > >>> > > > > > > Moreover, enabling greater tuning of these settings can > > >>> provide more > > >>> > > > > > > opportunity for performance optimization in production > > >>> environments. > > >>> > > > We > > >>> > > > > > > need the ability to modify these function producer > > settings: > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > - batchingEnabled > > >>> > > > > > > - chunkingEnabled > > >>> > > > > > > - blockIfQueueFull > > >>> > > > > > > - compressionType > > >>> > > > > > > - hashingScheme > > >>> > > > > > > - messageRoutingMode > > >>> > > > > > > - batchingMaxPublishDelay > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > *Implementation Strategy* > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > Ideally, we need a way to set default message producer > > >>> behavior (for > > >>> > > > > > > functions) on a cluster-wide level. However, operators > may > > >>> want > > >>> > > > specific > > >>> > > > > > > settings to be used for certain functions. (We can't > assume > > >>> that > > >>> > > > > > everyone > > >>> > > > > > > will want all functions in a cluster to have the same > > >>> producer > > >>> > > > > > settings.) > > >>> > > > > > > When an operator changes the cluster-wide producer > > defaults, > > >>> they > > >>> > > > need a > > >>> > > > > > > way to be able to roll out the changes to functions > without > > >>> > > > > > significantly > > >>> > > > > > > disrupting messaging flows in production environments. > They > > >>> also > > >>> > > need > > >>> > > > a > > >>> > > > > > > simple way to rollback changes in case a change to > function > > >>> producer > > >>> > > > > > > settings results in undesired behavior. However, not all > > >>> users will > > >>> > > > want > > >>> > > > > > > function updates to replace existing function producer > > >>> settings, > > >>> > > > > > especially > > >>> > > > > > > for functions that have been given custom producer > settings > > >>> (at the > > >>> > > > > > > per-function level.) Due to this difference in use cases, > > >>> there > > >>> > > needs > > >>> > > > > > to be > > >>> > > > > > > a way to allow an operator to specify what update > behavior > > >>> they > > >>> > > want. > > >>> > > > > > For a > > >>> > > > > > > given update, the user should be able to specify if: > > >>> > > > > > > > > >>> > > > > > > 1. Existing function producer settings will be replaced > by > > >>> > > > > > cluster-wide > > >>> > > > > > > defaults (unless overridden by producer settings provided > > in > > >>> the > > >>> > > > > > > request), > > >>> > > > > > > or > > >>> > > > > > > 2. Existing function producer settings will *not* be > > >>> replaced by > > >>> > > > > > > cluster-wide defaults (unless overridden by producer > > settings > > >>> > > > > > provided > > >>> > > > > > > in > > >>> > > > > > > the request). > > >>> > > > > > > > > >>> > > > > > > So, the two orders of precedence are: > > >>> > > > > > > > > >>> > > > > > > 1. newProducerConfig > cluster-wide defaults > > > >>> > > > existingProducerConfig > > >>> > > > > > > 2. newProducerConfig > existingProducerConfig > > > cluster-wide > > >>> > > > defaults > > >>> > > > > > > > > >>> > > > > > > We can add a boolean property to UpdateOptions to allow > the > > >>> > > precedence > > >>> > > > > > to > > >>> > > > > > > be specified. Since we don't want to introduce unexpected > > >>> side > > >>> > > effects > > >>> > > > > > > during an update, the default precedence should prefer > the > > >>> > > > > > > existingProducerConfig over cluster-wide defaults. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > To ensure that the desired producer settings (after > > >>> calculating the > > >>> > > > > > > settings from defaults + overrides) are available when > > >>> creating the > > >>> > > > > > > producers (on any cluster), we need to add these producer > > >>> settings > > >>> > > to > > >>> > > > > > the > > >>> > > > > > > function protobuf definition. Also, to ensure that > settings > > >>> can be > > >>> > > > > > properly > > >>> > > > > > > validated at the time they are submitted, we need > > validation > > >>> in two > > >>> > > > > > places: > > >>> > > > > > > > > >>> > > > > > > 1. When cluster-wide configs are loaded upon broker > startup > > >>> > > > > > > 2. When a function is registered or updated. > > >>> > > > > > > > > >>> > > > > > > As it would be impractical to check every registered > > function > > >>> > > > definition > > >>> > > > > > > during broker startup, most of the validation needs to > > occur > > >>> during > > >>> > > > > > > function registration or update. However, during broker > > >>> startup, we > > >>> > > > can > > >>> > > > > > at > > >>> > > > > > > least validate that the configurations exist and throw an > > >>> exception > > >>> > > if > > >>> > > > > > they > > >>> > > > > > > are invalid. > > >>> > > > > > > > > >>> > > > > > > As any configuration exceptions thrown when the producer > is > > >>> created > > >>> > > > > > would > > >>> > > > > > > bubble up to the client, we need to ensure that the final > > >>> > > > configurations > > >>> > > > > > > are valid when a function is registered or updated to > > ensure > > >>> there > > >>> > > are > > >>> > > > > > > exceptions when the producer is created. This requires > > >>> computing the > > >>> > > > > > > producer settings (according to the desired precedence) > > >>> during > > >>> > > > function > > >>> > > > > > > registration or update. > > >>> > > > > > > > > >>> > > > > > > When a function is registered, existing producer settings > > do > > >>> not > > >>> > > exist > > >>> > > > > > for > > >>> > > > > > > it, so we simply need to get cluster-wide defaults and > > >>> override them > > >>> > > > > > with > > >>> > > > > > > any specific producer settings provided in the > > >>> FunctionConfig's > > >>> > > > > > > ProducerConfig (submitted as a REST parameter.) After the > > >>> initial > > >>> > > > > > > registration, those computed settings will then be > > persisted > > >>> in > > >>> > > > > > bookkeeper > > >>> > > > > > > with the function definition. > > >>> > > > > > > > > >>> > > > > > > When a function is updated, we also need to address the > > >>> existing > > >>> > > > > > > producerConfig, and for this, we need to check the > desired > > >>> override > > >>> > > > > > > precedence (as explained above) when computing the final > > >>> producer > > >>> > > > > > settings > > >>> > > > > > > that will be persisted with the function. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > *Modifications* > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > As WorkerConfig is readily available when functions are > > >>> registered > > >>> > > and > > >>> > > > > > > updated, we can put new settings into function_worker.yml > > >>> and can > > >>> > > load > > >>> > > > > > and > > >>> > > > > > > validate them via WorkerConfig.load(..). To prevent > > >>> introducing > > >>> > > > breaking > > >>> > > > > > > changes, parameters will have defaults assigned in > > >>> WorkerConfig that > > >>> > > > > > > conform to current expected behavior, like this: > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @Data > > >>> > > > > > > > > >>> > > > > > > @Accessors(chain = true) > > >>> > > > > > > > > >>> > > > > > > public class FunctionDefaultsConfig implements > > Serializable { > > >>> > > > > > > > > >>> > > > > > > @FieldContext( > > >>> > > > > > > > > >>> > > > > > > doc = "Disables default message batching between > > >>> > > > functions" > > >>> > > > > > > > > >>> > > > > > > ) > > >>> > > > > > > > > >>> > > > > > > protected boolean batchingDisabled = false; > > >>> > > > > > > > > >>> > > > > > > @FieldContext( > > >>> > > > > > > > > >>> > > > > > > doc = "Enables default message chunking between > > >>> > > functions" > > >>> > > > > > > > > >>> > > > > > > ) > > >>> > > > > > > > > >>> > > > > > > protected boolean chunkingEnabled = false; > > >>> > > > > > > > > >>> > > > > > > @FieldContext( > > >>> > > > > > > > > >>> > > > > > > doc = "Disables default behavior to block when message > > >>> > > > > > queue is > > >>> > > > > > > full" > > >>> > > > > > > > > >>> > > > > > > ) > > >>> > > > > > > > > >>> > > > > > > protected boolean blockIfQueueFullDisabled = false; > > >>> > > > > > > > > >>> > > > > > > @FieldContext( > > >>> > > > > > > > > >>> > > > > > > doc = "Default compression type" > > >>> > > > > > > > > >>> > > > > > > ) > > >>> > > > > > > > > >>> > > > > > > protected String compressionType = "LZ4"; > > >>> > > > > > > > > >>> > > > > > > @FieldContext( > > >>> > > > > > > > > >>> > > > > > > doc = "Default hashing scheme" > > >>> > > > > > > > > >>> > > > > > > ) > > >>> > > > > > > > > >>> > > > > > > protected String hashingScheme = "Murmur3_32Hash"; > > >>> > > > > > > > > >>> > > > > > > @FieldContext( > > >>> > > > > > > > > >>> > > > > > > doc = "Default hashing scheme" > > >>> > > > > > > > > >>> > > > > > > ) > > >>> > > > > > > > > >>> > > > > > > protected String messageRoutingMode = "CustomPartition"; > > >>> > > > > > > > > >>> > > > > > > @FieldContext( > > >>> > > > > > > > > >>> > > > > > > doc = "Default max publish delay (in milliseconds) when > > >>> > > > > > message > > >>> > > > > > > batching is enabled" > > >>> > > > > > > > > >>> > > > > > > ) > > >>> > > > > > > > > >>> > > > > > > protected long batchingMaxPublishDelay = 10L; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > public ClusterFunctionProducerDefaults > > >>> buildProducerDefaults() > > >>> > > > > > > throws InvalidWorkerConfigDefaultException > > >>> > > > > > > { > > >>> > > > > > > > > >>> > > > > > > return new > > >>> > > > > > > ClusterFunctionProducerDefaults(this.isBatchingDisabled() > > >>> > > > > > > , > > >>> > > > > > > > > >>> > > > > > > this.isChunkingEnabled(), > > >>> > > > > > > this.isBlockIfQueueFullDisabled(), > > >>> > > > > > > this.getCompressionType(), > > >>> > > > > > > > > >>> > > > > > > this.getHashingScheme(), > > >>> > > this.getMessageRoutingMode(), > > >>> > > > > > this > > >>> > > > > > > .getBatchingMaxPublishDelay()); > > >>> > > > > > > > > >>> > > > > > > } > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > public FunctionDefaultsConfig() { > > >>> > > > > > > > > >>> > > > > > > } > > >>> > > > > > > > > >>> > > > > > > } > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > The additional section in function_worker.yml will look > > like > > >>> this: > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > functionDefaults: > > >>> > > > > > > > > >>> > > > > > > batchingDisabled: true > > >>> > > > > > > > > >>> > > > > > > chunkingEnabled: false > > >>> > > > > > > > > >>> > > > > > > blockIfQueueFullDisabled: false > > >>> > > > > > > > > >>> > > > > > > batchingMaxPublishDelay: 12 > > >>> > > > > > > > > >>> > > > > > > compressionType: ZLIB > > >>> > > > > > > > > >>> > > > > > > hashingScheme: JavaStringHash > > >>> > > > > > > > > >>> > > > > > > messageRoutingMode: RoundRobinPartition > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > During function update, we can use a new boolean option, > > >>> > > > > > > ignoreExistingFunctionDefaults, to toggle precedence: > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @Data > > >>> > > > > > > > > >>> > > > > > > @NoArgsConstructor > > >>> > > > > > > > > >>> > > > > > > @ApiModel(value = "UpdateOptions", description = "Options > > >>> while > > >>> > > > updating > > >>> > > > > > > function defaults or the sink") > > >>> > > > > > > > > >>> > > > > > > public class UpdateOptions { > > >>> > > > > > > > > >>> > > > > > > @ApiModelProperty( > > >>> > > > > > > > > >>> > > > > > > value = "Whether or not to update the auth data", > > >>> > > > > > > > > >>> > > > > > > name = "update-auth-data") > > >>> > > > > > > > > >>> > > > > > > private boolean updateAuthData = false; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @ApiModelProperty( > > >>> > > > > > > > > >>> > > > > > > value="Whether or not to ignore any existing function > > >>> > > > > > > defaults", > > >>> > > > > > > > > >>> > > > > > > name ="ignore-existing-function-defaults") > > >>> > > > > > > > > >>> > > > > > > private boolean ignoreExistingFunctionDefaults = false; > > >>> > > > > > > > > >>> > > > > > > } > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > To ensure that function producer settings don't get > > modified > > >>> > > > > > unexpectedly > > >>> > > > > > > by an update, ignoreExistingFunctionDefaults = false by > > >>> default. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > The updated ProducerConfig will contain the new > properties > > >>> and a > > >>> > > > method > > >>> > > > > > to > > >>> > > > > > > merge an incoming ProducerConfig with an existing > > >>> ProducerConfig. > > >>> > > (The > > >>> > > > > > > merged ProducerConfig will use the incoming > ProducerConfig > > >>> > > properties > > >>> > > > > > when > > >>> > > > > > > they exist and will use existing ProducerConfig > properties > > >>> > > otherwise.) > > >>> > > > > > To > > >>> > > > > > > ensure we don't need to force operators to provide those > > >>> > > > ProducerConfig > > >>> > > > > > > properties with every create or update, they must be > > >>> nullable, not > > >>> > > > > > > primitive (e.g. Boolean instead of boolean.) > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > To support the additional properties, FunctionConfigUtils > > >>> (and any > > >>> > > > > > similar > > >>> > > > > > > classes) will need to be modified to correctly translate > > >>> between the > > >>> > > > > > > ProducerConfig and the protobuf ProducerSpec. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > The updated ProducerSpec in Function.proto will look like > > >>> this: > > >>> > > > > > > > > >>> > > > > > > message ProducerSpec { > > >>> > > > > > > > > >>> > > > > > > int32 maxPendingMessages = 1; > > >>> > > > > > > > > >>> > > > > > > int32 maxPendingMessagesAcrossPartitions = 2; > > >>> > > > > > > > > >>> > > > > > > bool useThreadLocalProducers = 3; > > >>> > > > > > > > > >>> > > > > > > CryptoSpec cryptoSpec = 4; > > >>> > > > > > > > > >>> > > > > > > string batchBuilder = 5; > > >>> > > > > > > > > >>> > > > > > > bool batchingDisabled = 6; > > >>> > > > > > > > > >>> > > > > > > bool chunkingEnabled = 7; > > >>> > > > > > > > > >>> > > > > > > bool blockIfQueueFullDisabled = 8; > > >>> > > > > > > > > >>> > > > > > > CompressionType compressionType = 9; > > >>> > > > > > > > > >>> > > > > > > HashingScheme hashingScheme = 10; > > >>> > > > > > > > > >>> > > > > > > MessageRoutingMode messageRoutingMode = 11; > > >>> > > > > > > > > >>> > > > > > > int64 batchingMaxPublishDelay = 12; > > >>> > > > > > > > > >>> > > > > > > } > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > To support these new parameters during local run and for > > >>> > > convenience, > > >>> > > > we > > >>> > > > > > > should be able to specify them in the Admin CLI and in > > local > > >>> run > > >>> > > > modes. > > >>> > > > > > > > > >>> > > > > > > For local run, as function_worker.yml is not used, these > > >>> parameters > > >>> > > > will > > >>> > > > > > > have defaults and be primitive values: > > >>> > > > > > > > > >>> > > > > > > @Parameter(names = "--clusterFunctionBatchingDisabled", > > >>> description > > >>> > > = > > >>> > > > > > > "Disable > > >>> > > > > > > the default message batching behavior for functions", > > hidden > > >>> = true) > > >>> > > > > > > > > >>> > > > > > > public boolean clusterFunctionBatchingDisabled = false; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @Parameter(names = "--clusterFunctionChunkingEnabled", > > >>> description = > > >>> > > > > > "The > > >>> > > > > > > default message chunking behavior for functions", hidden > = > > >>> true) > > >>> > > > > > > > > >>> > > > > > > public boolean clusterFunctionChunkingEnabled = false; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @Parameter(names = > > >>> "--clusterFunctionBlockIfQueueFullDisabled", > > >>> > > > > > description > > >>> > > > > > > = "Disable the default blocking behavior for functions > when > > >>> queue is > > >>> > > > > > > full", hidden > > >>> > > > > > > = true) > > >>> > > > > > > > > >>> > > > > > > public boolean clusterFunctionBlockIfQueueFullDisabled = > > >>> false; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @Parameter(names = > > "--clusterFunctionCompressionTypeDefault", > > >>> > > > > > > description = "The > > >>> > > > > > > default Compression Type for functions", hidden = true) > > >>> > > > > > > > > >>> > > > > > > public String clusterFunctionCompressionTypeDefault = > > "LZ4"; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @Parameter(names = > "--clusterFunctionHashingSchemeDefault", > > >>> > > > description > > >>> > > > > > = > > >>> > > > > > > "The > > >>> > > > > > > default Hashing Scheme for functions", hidden = true) > > >>> > > > > > > > > >>> > > > > > > public String clusterFunctionHashingSchemeDefault = > > >>> > > "Murmur3_32Hash"; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @Parameter(names = > > >>> "--clusterFunctionMessageRoutingModeDefault", > > >>> > > > > > > description > > >>> > > > > > > = "The default Message Routing Mode for functions", > hidden > > = > > >>> true) > > >>> > > > > > > > > >>> > > > > > > public String clusterFunctionMessageRoutingModeDefault = > > >>> > > > > > "CustomPartition"; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > @Parameter(names = > > >>> > > "--clusterFunctionBatchingMaxPublishDelayDefault", > > >>> > > > > > > description > > >>> > > > > > > = "The default max publish delay (in milliseconds) for > > >>> functions > > >>> > > when > > >>> > > > > > > message batching is enabled", hidden = true) > > >>> > > > > > > > > >>> > > > > > > public long > clusterFunctionBatchingMaxPublishDelayDefault = > > >>> 10L; > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > For Admin CLI, parameters can be provided in a similar > way, > > >>> but they > > >>> > > > > > will > > >>> > > > > > > not have defaults (i.e. Boolean instead of boolean) since > > >>> parameters > > >>> > > > > > should > > >>> > > > > > > be null to ensure non-provided configs are obtained from > > >>> > > WorkerConfig. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > To handle the logic of selecting the property from an > > >>> incoming > > >>> > > > > > > producerConfig vs the producerDefaults object (from > > >>> WorkerConfig), > > >>> > > we > > >>> > > > > > can > > >>> > > > > > > use a FunctionDefaultsMediator to handle this. That > > >>> interface looks > > >>> > > > like > > >>> > > > > > > this and gives us flexibility to handle mediation for > > >>> specific > > >>> > > > > > properties > > >>> > > > > > > without needing to convert the entire object. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > public interface FunctionDefaultsMediator { > > >>> > > > > > > > > >>> > > > > > > boolean isBatchingDisabled(); > > >>> > > > > > > > > >>> > > > > > > boolean isChunkingEnabled(); > > >>> > > > > > > > > >>> > > > > > > boolean isBlockIfQueueFullDisabled(); > > >>> > > > > > > > > >>> > > > > > > CompressionType getCompressionType(); > > >>> > > > > > > > > >>> > > > > > > Function.CompressionType getCompressionTypeProto(); > > >>> > > > > > > > > >>> > > > > > > HashingScheme getHashingScheme(); > > >>> > > > > > > > > >>> > > > > > > Function.HashingScheme getHashingSchemeProto(); > > >>> > > > > > > > > >>> > > > > > > MessageRoutingMode getMessageRoutingMode(); > > >>> > > > > > > > > >>> > > > > > > Function.MessageRoutingMode getMessageRoutingModeProto(); > > >>> > > > > > > > > >>> > > > > > > Long getBatchingMaxPublishDelay(); > > >>> > > > > > > > > >>> > > > > > > } > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > (Alternatively, we could remove the Proto methods from > the > > >>> mediator > > >>> > > > and > > >>> > > > > > > just use FunctionConfigUtils.convert(..) to translate > > >>> between the > > >>> > > > > > protobuf > > >>> > > > > > > ProducerSpec and the Java ProducerConfig.) > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > *Testing* > > >>> > > > > > > > > >>> > > > > > > In terms of testing, many of this functionality will be > > >>> covered by > > >>> > > > > > existing > > >>> > > > > > > tests in the pulsar-functions module. However, we need to > > >>> test each > > >>> > > of > > >>> > > > > > the > > >>> > > > > > > new classes, as well as these behaviors: > > >>> > > > > > > > > >>> > > > > > > - functionRegistration (with different provided > > >>> ProducerConfig > > >>> > > > > > > properties) > > >>> > > > > > > - functionUpdate (for ignoreExistingFunctionDefaults=true > > and > > >>> > > > > > > ignoreExistingFunctionDefaults=false) > > >>> > > > > > > - loading function_worker.yml into WorkerConfig > > >>> > > > > > > - throwing exceptions correctly when invalid parameters > are > > >>> > > > provided > > >>> > > > > > > - converting between ProducerSpec and ProducerConfig > > >>> > > > > > > - correctly handling precedence when computing the final > > >>> > > > > > ProducerConfig > > >>> > > > > > > values > > >>> > > > > > > - creating producers > > >>> > > > > > > - ensuring functionality is consistent across runtimes > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > I'd like feedback on this proposal. > > >>> > > > > > > > > >>> > > > > > > Devin G. Bost > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > >> > > >