I think I was able to resolve that issue. Fortunately, only FunctionResultRouter had the singleton behavior, so resolving that was straightforward. The current PR for part 1 is here (in my personal CI): https://github.com/devinbost/pulsar/pull/5/files Once the tests look good, I'll submit a PR to Apache Pulsar.
Devin G. Bost On Mon, Apr 5, 2021 at 8:50 PM Devin Bost <devin.b...@gmail.com> wrote: > It appears that the reason the router classes are singletons is to ensure > the clock behavior is consistent across producers. > I wonder if we might be able to avoid creating a breaking change by > refactoring out the clock into a singleton that could be shared across the > routers, which we could then make not singletons. > That would allow the routers the ability to vary behavior through their > constructors (thus avoiding a breaking change) and eliminate the race > condition that would occur any time two producers are created with the same > router class and different batching behavior. > > Devin G. Bost > > > On Mon, Apr 5, 2021 at 6:01 PM Devin Bost <devin.b...@gmail.com> wrote: > >> Jerry, >> >> Thanks for the information and for the links! >> Thanks also for the suggestion to split the PR into two parts. That will >> make it easier to get some of this work completed in time for the 2.8.0 >> release. >> >> While working on the first part, I discovered something that needs >> community attention. I discovered that in order to create the producer >> (e.g. >> https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L117), >> we get the MessageRouter from a singleton that has isBatchingEnabled in its >> constructor. >> [image: image.png] >> The problem is that FunctionResultRouter is a shared instance, so if one >> function changes the router's value of isBatchingEnabled, it will create a >> race condition with other functions that share the same router. It appears >> that the reason the router is a singleton is to ensure that clock behavior >> is consistent. >> >> Unless someone has a better idea, it looks like it might be necessary to >> refactor isBatchingEnabled ( >> https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java#L46) >> from its constructor into a method parameter on the router. >> >> However, moving isBatchingEnabled to a method parameter is a breaking >> change on the client interface: >> https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageRouter.java#L56 >> >> I don't want to introduce breaking changes, but I'm not sure how else to >> ensure that the message router behaves correctly across function instances ( >> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java#L82) >> when some functions may have batching enabled and some may not. >> >> Looking for ideas/feedback. >> >> Devin G. Bost >> >> >> On Mon, Apr 5, 2021 at 1:11 PM Jerry Peng <jerry.boyang.p...@gmail.com> >> wrote: >> >>> > >>> > The interceptor would be loaded at function registration. >>> >>> >>> The interceptor classes should be loaded at broker/worker startup time. >>> >>> In order for an >>> > interceptor to be picked up, a jar would need to be compiled with the >>> class >>> > that implements the interface, and the jar would need to be deployed >>> into a >>> > directory that matches the directory provided somewhere. (Would this >>> > directory be specified in function_worker.yml? Or, would we need to >>> reuse >>> > one of the existing interceptor directories and filter classes at >>> > runtime?) >>> > >>> >>> The JAR with classes for the interceptor can just be placed in the lib/ >>> directory of the pulsar project. An additional config may be needed to >>> be >>> added to function_worker.yml so the classname of the interceptor impl to >>> be >>> provided. >>> >>> In order for changes to the FunctionConfig values to reach where the >>> > Function producers are created, we'd still need to add the producer >>> > properties to the Function.proto and FunctionConfig.ProducerConfig >>> > definitions. We'd also still need to modify CLI parameters to ensure >>> these >>> > values could be modified on a per-function basis. >>> > >>> >>> For you use case of disabling batching, you will still need to add an >>> additional field in the Function.proto so that it can be configurable. I >>> would recommend you doing that work separately since that work is >>> orthogonal to interceptors. >>> >>> When a function is registered, instead of retrieving cluster-wide >>> function >>> > producer defaults from WorkerConfig, it would load the interceptor, >>> which >>> > would modify the FunctionConfig's ProducerConfig's property values to >>> > provide the cluster-wide defaults for any value that wasn't provided >>> by the >>> > REST call. >>> > >>> >>> Yes. >>> >>> If an interceptor wasn't provided, during conversion of the >>> ProducerConfig >>> > to the protobuf ProducerSpec, the protobuf defaults will take effect >>> (since >>> > protobuf doesn't allow null primitives), which will then be picked up >>> when >>> > the producer is created. >>> > >>> >>> >>> If an interceptor impl is not provided, the behavior of the APIs should >>> be >>> the same as the current behavior. >>> >>> >>> Back in the Streamlio, I actually created an interceptor mechanism. >>> >>> Please take at this old PR: >>> >>> https://github.com/apache/pulsar/pull/7159 >>> >>> or more specifically what is done for functions here. >>> >>> >>> https://github.com/apache/pulsar/pull/7159/files#diff-427711b7c1c3b9f17cd1c02bb9ceb942af454676dd97902a43e910f645febc6d >>> >>> >>> Perhaps you can help revive this work! >>> >>> Best, >>> >>> Jerry >>> >>> On Mon, Apr 5, 2021 at 11:46 AM Devin Bost <devin.b...@gmail.com> wrote: >>> >>> > Here's my understanding of what I'd need to do to implement the >>> interceptor >>> > approach. >>> > The interceptor would be loaded at function registration. In order for >>> an >>> > interceptor to be picked up, a jar would need to be compiled with the >>> class >>> > that implements the interface, and the jar would need to be deployed >>> into a >>> > directory that matches the directory provided somewhere. (Would this >>> > directory be specified in function_worker.yml? Or, would we need to >>> reuse >>> > one of the existing interceptor directories and filter classes at >>> > runtime?) >>> > In order for changes to the FunctionConfig values to reach where the >>> > Function producers are created, we'd still need to add the producer >>> > properties to the Function.proto and FunctionConfig.ProducerConfig >>> > definitions. We'd also still need to modify CLI parameters to ensure >>> these >>> > values could be modified on a per-function basis. >>> > When a function is registered, instead of retrieving cluster-wide >>> function >>> > producer defaults from WorkerConfig, it would load the interceptor, >>> which >>> > would modify the FunctionConfig's ProducerConfig's property values to >>> > provide the cluster-wide defaults for any value that wasn't provided >>> by the >>> > REST call. >>> > If an interceptor wasn't provided, during conversion of the >>> ProducerConfig >>> > to the protobuf ProducerSpec, the protobuf defaults will take effect >>> (since >>> > protobuf doesn't allow null primitives), which will then be picked up >>> when >>> > the producer is created. >>> > >>> > Is that right? >>> > >>> > Devin G. Bost >>> > >>> > >>> > On Mon, Apr 5, 2021 at 11:56 AM Devin Bost <devin.b...@gmail.com> >>> wrote: >>> > >>> > > If we use the interceptor approach, does that mean a cluster-admin >>> would >>> > > need to write custom code in order to override the function producer >>> > > settings? I worry about adding additional burden to cluster >>> > administration. >>> > > >>> > > -- >>> > > Devin G. Bost >>> > > >>> > > On Sat, Apr 3, 2021, 8:26 AM Devin Bost <devin.b...@gmail.com> >>> wrote: >>> > > >>> > >> Hi Rui, >>> > >> >>> > >> Thanks for the feedback. My understanding about the interceptor is >>> that >>> > >> it's not something that would be touched by users but would only be >>> > >> modified by cluster admins. So, I think we'd still need to include >>> the >>> > >> parameters the parameters on the ProducerConfig. >>> > >> I already have the other approach coded, so changing the design will >>> > >> require some rework. >>> > >> I'll look at that example you mentioned. >>> > >> >>> > >> -- >>> > >> Devin G. Bost >>> > >> >>> > >> On Fri, Apr 2, 2021, 10:18 PM Rui Fu <f...@rui.sh> wrote: >>> > >> >>> > >>> Hi Devin, >>> > >>> >>> > >>> Great proposal for giving function such useful feature, and thanks >>> for >>> > >>> your detailed writing. After going through the history of this >>> > discussion, >>> > >>> I would like to have +1 with Jerry’s suggestion. With the >>> interceptor, >>> > the >>> > >>> admins and users may have the maximum flexibility to customize >>> their >>> > >>> producer configs. It may also reduce the parameters we put into the >>> > configs >>> > >>> and pulsar-admin. Also, we have some interceptors with Pulsar >>> Functions >>> > >>> already, like the `RuntimeCustomizer` interface, so it is not a new >>> > tech >>> > >>> that users hard to adopt. >>> > >>> >>> > >>> Besides, I would like to suggest that to cover these changes with >>> > Python >>> > >>> & Go runtime as well if we will have some new fields to >>> > `Function.proto`. >>> > >>> >>> > >>> Best, >>> > >>> >>> > >>> Rui >>> > >>> 在 2021年4月2日 +0800 AM10:13,Devin Bost <devin.b...@gmail.com>,写道: >>> > >>> > What would be some of the additional benefits of using the >>> > interceptor >>> > >>> > approach? >>> > >>> > >>> > >>> > -- >>> > >>> > Devin G. Bost >>> > >>> > >>> > >>> > On Thu, Apr 1, 2021, 6:03 PM Jerry Peng < >>> jerry.boyang.p...@gmail.com >>> > > >>> > >>> wrote: >>> > >>> > >>> > >>> > > Devin, >>> > >>> > > >>> > >>> > > Interceptors are located on the server side (broker/worker). >>> > Cluster >>> > >>> > > admins would create the plugins based on the interfaces I >>> described >>> > >>> and >>> > >>> > > install them on the server side. Regular function developers >>> will >>> > not >>> > >>> > > implement or interact directly with the interceptor. >>> > >>> > > >>> > >>> > > > Would it be better to just ignore WorkerConfig defaults >>> during >>> > >>> function >>> > >>> > > update? >>> > >>> > > >>> > >>> > > Yes. >>> > >>> > > >>> > >>> > > If the cluster admin changes the function config defaults and >>> wants >>> > >>> > > existing functions to utilize those configs that have changed, >>> the >>> > >>> admin >>> > >>> > > can just update those configs of the functions through the >>> regular >>> > >>> update >>> > >>> > > mechanism for functions we have today. >>> > >>> > > >>> > >>> > > >>> > >>> > > >>> > >>> > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost < >>> devin.b...@gmail.com> >>> > >>> wrote: >>> > >>> > > >>> > >>> > > > Would it be better to just ignore WorkerConfig defaults >>> during >>> > >>> function >>> > >>> > > > update? We could still update the function producer behavior >>> > >>> through the >>> > >>> > > > producerConfig passed as a REST parameter, but we'd avoid the >>> > edge >>> > >>> case I >>> > >>> > > > mentioned. >>> > >>> > > > >>> > >>> > > > Devin G. Bost >>> > >>> > > > >>> > >>> > > > >>> > >>> > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost < >>> devin.b...@gmail.com> >>> > >>> wrote: >>> > >>> > > > >>> > >>> > > > > I just remembered an edge case that motivated me to create >>> that >>> > >>> > > > additional >>> > >>> > > > > flow. >>> > >>> > > > > In my PR, if any producerConfig values are null at >>> registration >>> > >>> or >>> > >>> > > > update, >>> > >>> > > > > we get those values from WorkerConfig instead. >>> > >>> > > > > However, when a user runs an update (on a function) for the >>> > >>> first time >>> > >>> > > > > after upgrading to the version of Pulsar with this >>> feature, if >>> > >>> someone >>> > >>> > > > has >>> > >>> > > > > modified the function_worker.yml file (perhaps without >>> their >>> > >>> > > knowledge), >>> > >>> > > > > those defaults from function_worker.yml will get picked up >>> > >>> because the >>> > >>> > > > > existing producerConfig will have null set for those values >>> > >>> (because >>> > >>> > > the >>> > >>> > > > > function was registered prior to the existence of this >>> > feature.) >>> > >>> So, >>> > >>> > > > > the user could be trying to update something unrelated, not >>> > >>> realizing >>> > >>> > > > that >>> > >>> > > > > their function is going to have its producerConfig updated >>> by >>> > the >>> > >>> > > cluster >>> > >>> > > > > defaults. Most users won't be affected by this, but it >>> could be >>> > >>> tricky >>> > >>> > > to >>> > >>> > > > > diagnose for the few who would get impacted by it. (Is this >>> > >>> description >>> > >>> > > > any >>> > >>> > > > > clearer? If not, it might be clearer if I diagram it.) >>> > >>> > > > > >>> > >>> > > > > Devin G. Bost >>> > >>> > > > > >>> > >>> > > > > >>> > >>> > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng < >>> > >>> jerry.boyang.p...@gmail.com >>> > >>> > > > >>> > >>> > > > > wrote: >>> > >>> > > > > >>> > >>> > > > > > Hi Devin, >>> > >>> > > > > > >>> > >>> > > > > > I understand the usefulness of giving cluster admins the >>> > >>> ability to >>> > >>> > > > > > specify >>> > >>> > > > > > some defaults for functions at a cluster level. I think >>> the >>> > >>> right >>> > >>> > > time >>> > >>> > > > to >>> > >>> > > > > > apply those defaults is during function registration >>> time. I >>> > >>> am not >>> > >>> > > > sure >>> > >>> > > > > > I >>> > >>> > > > > > understand the usefulness of what you are proposing we do >>> > >>> during >>> > >>> > > update >>> > >>> > > > > > time. If you would like to change a config of an already >>> > >>> running >>> > >>> > > > function >>> > >>> > > > > > why not just update it with config changes that are >>> needed. I >>> > >>> am not >>> > >>> > > > sure >>> > >>> > > > > > you need the additional workflow you proposed for the >>> update. >>> > >>> > > > > > >>> > >>> > > > > > Another idea to satisfy your use case is that instead of >>> > >>> leveraging a >>> > >>> > > > > > default config mechanism perhaps we should take a look at >>> > >>> implementing >>> > >>> > > > > > interceptors for the function/sources/sink API calls. We >>> > >>> already have >>> > >>> > > > > > interceptors implemented for many other pulsar >>> > functionalities. >>> > >>> > > Perhaps >>> > >>> > > > we >>> > >>> > > > > > should do it for Pulsar Functions APIs as well. With >>> > >>> interceptors you >>> > >>> > > > > > will >>> > >>> > > > > > be able to intercept all API operations to functions and >>> > >>> customize the >>> > >>> > > > > > requests however you would like. >>> > >>> > > > > > >>> > >>> > > > > > We can have a interface like: >>> > >>> > > > > > >>> > >>> > > > > > interface FunctionInterceptors { >>> > >>> > > > > > >>> > >>> > > > > > void regsterFunctionInterceptor (String tenant, String >>> > >>> namespace, >>> > >>> > > String >>> > >>> > > > > > functionName, FunctionConfig functionConfig...); >>> > >>> > > > > > >>> > >>> > > > > > ... >>> > >>> > > > > > >>> > >>> > > > > > } >>> > >>> > > > > > >>> > >>> > > > > > for developers to implement. During the beginning of >>> every >>> > >>> Function >>> > >>> > > API >>> > >>> > > > > > call the corresponding interceptor method gets called. >>> For >>> > >>> example, >>> > >>> > > the >>> > >>> > > > > > "regsterFunctionInterceptor()" method I provided above >>> will >>> > be >>> > >>> called >>> > >>> > > > > > towards the beginning of the registerFunction and allow >>> you >>> > to >>> > >>> > > customize >>> > >>> > > > > > the function config however you want. >>> > >>> > > > > > >>> > >>> > > > > > Best, >>> > >>> > > > > > >>> > >>> > > > > > Jerry >>> > >>> > > > > > >>> > >>> > > > > > >>> > >>> > > > > > >>> > >>> > > > > > On Thu, Apr 1, 2021 at 2:08 PM Devin Bost < >>> > >>> devin.b...@gmail.com> >>> > >>> > > wrote: >>> > >>> > > > > > >>> > >>> > > > > > > *Cluster-Wide and Function-Specific Producer Defaults* >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > * - Status: Proposal- Author: Devin Bost (with guidance >>> > from >>> > >>> Jerry >>> > >>> > > > > > Peng)- >>> > >>> > > > > > > Pull Request: >>> https://github.com/apache/pulsar/pull/9987 >>> > >>> > > > > > > <https://github.com/apache/pulsar/pull/9987>- Mailing >>> List >>> > >>> > > > discussion: >>> > >>> > > > > > - >>> > >>> > > > > > > Release: 2.8.0* >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > *Motivation* >>> > >>> > > > > > > >>> > >>> > > > > > > Pulsar currently provides no way to allow users or >>> > operators >>> > >>> to >>> > >>> > > change >>> > >>> > > > > > the >>> > >>> > > > > > > producer behavior of Pulsar functions. For >>> organizations >>> > >>> that are >>> > >>> > > > making >>> > >>> > > > > > > heavy use of functions, the ability to tune messaging >>> > >>> behavior of >>> > >>> > > > > > functions >>> > >>> > > > > > > is greatly desired. Moreover, current function >>> messaging >>> > >>> defaults >>> > >>> > > > > > appear to >>> > >>> > > > > > > be causing issues for certain workloads. (For example, >>> some >>> > >>> bugs >>> > >>> > > > appear >>> > >>> > > > > > to >>> > >>> > > > > > > be related to having batching enabled. See #6054.) >>> Enabling >>> > >>> > > operators >>> > >>> > > > to >>> > >>> > > > > > > modify these settings will help Pulsar cluster admins >>> > >>> workaround >>> > >>> > > > issues >>> > >>> > > > > > to >>> > >>> > > > > > > enable them to upgrade legacy versions of Pulsar to >>> more >>> > >>> recent >>> > >>> > > > > > versions. >>> > >>> > > > > > > Moreover, enabling greater tuning of these settings can >>> > >>> provide more >>> > >>> > > > > > > opportunity for performance optimization in production >>> > >>> environments. >>> > >>> > > > We >>> > >>> > > > > > > need the ability to modify these function producer >>> > settings: >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > - batchingEnabled >>> > >>> > > > > > > - chunkingEnabled >>> > >>> > > > > > > - blockIfQueueFull >>> > >>> > > > > > > - compressionType >>> > >>> > > > > > > - hashingScheme >>> > >>> > > > > > > - messageRoutingMode >>> > >>> > > > > > > - batchingMaxPublishDelay >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > *Implementation Strategy* >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > Ideally, we need a way to set default message producer >>> > >>> behavior (for >>> > >>> > > > > > > functions) on a cluster-wide level. However, operators >>> may >>> > >>> want >>> > >>> > > > specific >>> > >>> > > > > > > settings to be used for certain functions. (We can't >>> assume >>> > >>> that >>> > >>> > > > > > everyone >>> > >>> > > > > > > will want all functions in a cluster to have the same >>> > >>> producer >>> > >>> > > > > > settings.) >>> > >>> > > > > > > When an operator changes the cluster-wide producer >>> > defaults, >>> > >>> they >>> > >>> > > > need a >>> > >>> > > > > > > way to be able to roll out the changes to functions >>> without >>> > >>> > > > > > significantly >>> > >>> > > > > > > disrupting messaging flows in production environments. >>> They >>> > >>> also >>> > >>> > > need >>> > >>> > > > a >>> > >>> > > > > > > simple way to rollback changes in case a change to >>> function >>> > >>> producer >>> > >>> > > > > > > settings results in undesired behavior. However, not >>> all >>> > >>> users will >>> > >>> > > > want >>> > >>> > > > > > > function updates to replace existing function producer >>> > >>> settings, >>> > >>> > > > > > especially >>> > >>> > > > > > > for functions that have been given custom producer >>> settings >>> > >>> (at the >>> > >>> > > > > > > per-function level.) Due to this difference in use >>> cases, >>> > >>> there >>> > >>> > > needs >>> > >>> > > > > > to be >>> > >>> > > > > > > a way to allow an operator to specify what update >>> behavior >>> > >>> they >>> > >>> > > want. >>> > >>> > > > > > For a >>> > >>> > > > > > > given update, the user should be able to specify if: >>> > >>> > > > > > > >>> > >>> > > > > > > 1. Existing function producer settings will be >>> replaced by >>> > >>> > > > > > cluster-wide >>> > >>> > > > > > > defaults (unless overridden by producer settings >>> provided >>> > in >>> > >>> the >>> > >>> > > > > > > request), >>> > >>> > > > > > > or >>> > >>> > > > > > > 2. Existing function producer settings will *not* be >>> > >>> replaced by >>> > >>> > > > > > > cluster-wide defaults (unless overridden by producer >>> > settings >>> > >>> > > > > > provided >>> > >>> > > > > > > in >>> > >>> > > > > > > the request). >>> > >>> > > > > > > >>> > >>> > > > > > > So, the two orders of precedence are: >>> > >>> > > > > > > >>> > >>> > > > > > > 1. newProducerConfig > cluster-wide defaults > >>> > >>> > > > existingProducerConfig >>> > >>> > > > > > > 2. newProducerConfig > existingProducerConfig > >>> > cluster-wide >>> > >>> > > > defaults >>> > >>> > > > > > > >>> > >>> > > > > > > We can add a boolean property to UpdateOptions to >>> allow the >>> > >>> > > precedence >>> > >>> > > > > > to >>> > >>> > > > > > > be specified. Since we don't want to introduce >>> unexpected >>> > >>> side >>> > >>> > > effects >>> > >>> > > > > > > during an update, the default precedence should prefer >>> the >>> > >>> > > > > > > existingProducerConfig over cluster-wide defaults. >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > To ensure that the desired producer settings (after >>> > >>> calculating the >>> > >>> > > > > > > settings from defaults + overrides) are available when >>> > >>> creating the >>> > >>> > > > > > > producers (on any cluster), we need to add these >>> producer >>> > >>> settings >>> > >>> > > to >>> > >>> > > > > > the >>> > >>> > > > > > > function protobuf definition. Also, to ensure that >>> settings >>> > >>> can be >>> > >>> > > > > > properly >>> > >>> > > > > > > validated at the time they are submitted, we need >>> > validation >>> > >>> in two >>> > >>> > > > > > places: >>> > >>> > > > > > > >>> > >>> > > > > > > 1. When cluster-wide configs are loaded upon broker >>> startup >>> > >>> > > > > > > 2. When a function is registered or updated. >>> > >>> > > > > > > >>> > >>> > > > > > > As it would be impractical to check every registered >>> > function >>> > >>> > > > definition >>> > >>> > > > > > > during broker startup, most of the validation needs to >>> > occur >>> > >>> during >>> > >>> > > > > > > function registration or update. However, during broker >>> > >>> startup, we >>> > >>> > > > can >>> > >>> > > > > > at >>> > >>> > > > > > > least validate that the configurations exist and throw >>> an >>> > >>> exception >>> > >>> > > if >>> > >>> > > > > > they >>> > >>> > > > > > > are invalid. >>> > >>> > > > > > > >>> > >>> > > > > > > As any configuration exceptions thrown when the >>> producer is >>> > >>> created >>> > >>> > > > > > would >>> > >>> > > > > > > bubble up to the client, we need to ensure that the >>> final >>> > >>> > > > configurations >>> > >>> > > > > > > are valid when a function is registered or updated to >>> > ensure >>> > >>> there >>> > >>> > > are >>> > >>> > > > > > > exceptions when the producer is created. This requires >>> > >>> computing the >>> > >>> > > > > > > producer settings (according to the desired precedence) >>> > >>> during >>> > >>> > > > function >>> > >>> > > > > > > registration or update. >>> > >>> > > > > > > >>> > >>> > > > > > > When a function is registered, existing producer >>> settings >>> > do >>> > >>> not >>> > >>> > > exist >>> > >>> > > > > > for >>> > >>> > > > > > > it, so we simply need to get cluster-wide defaults and >>> > >>> override them >>> > >>> > > > > > with >>> > >>> > > > > > > any specific producer settings provided in the >>> > >>> FunctionConfig's >>> > >>> > > > > > > ProducerConfig (submitted as a REST parameter.) After >>> the >>> > >>> initial >>> > >>> > > > > > > registration, those computed settings will then be >>> > persisted >>> > >>> in >>> > >>> > > > > > bookkeeper >>> > >>> > > > > > > with the function definition. >>> > >>> > > > > > > >>> > >>> > > > > > > When a function is updated, we also need to address the >>> > >>> existing >>> > >>> > > > > > > producerConfig, and for this, we need to check the >>> desired >>> > >>> override >>> > >>> > > > > > > precedence (as explained above) when computing the >>> final >>> > >>> producer >>> > >>> > > > > > settings >>> > >>> > > > > > > that will be persisted with the function. >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > *Modifications* >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > As WorkerConfig is readily available when functions are >>> > >>> registered >>> > >>> > > and >>> > >>> > > > > > > updated, we can put new settings into >>> function_worker.yml >>> > >>> and can >>> > >>> > > load >>> > >>> > > > > > and >>> > >>> > > > > > > validate them via WorkerConfig.load(..). To prevent >>> > >>> introducing >>> > >>> > > > breaking >>> > >>> > > > > > > changes, parameters will have defaults assigned in >>> > >>> WorkerConfig that >>> > >>> > > > > > > conform to current expected behavior, like this: >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @Data >>> > >>> > > > > > > >>> > >>> > > > > > > @Accessors(chain = true) >>> > >>> > > > > > > >>> > >>> > > > > > > public class FunctionDefaultsConfig implements >>> > Serializable { >>> > >>> > > > > > > >>> > >>> > > > > > > @FieldContext( >>> > >>> > > > > > > >>> > >>> > > > > > > doc = "Disables default message batching between >>> > >>> > > > functions" >>> > >>> > > > > > > >>> > >>> > > > > > > ) >>> > >>> > > > > > > >>> > >>> > > > > > > protected boolean batchingDisabled = false; >>> > >>> > > > > > > >>> > >>> > > > > > > @FieldContext( >>> > >>> > > > > > > >>> > >>> > > > > > > doc = "Enables default message chunking between >>> > >>> > > functions" >>> > >>> > > > > > > >>> > >>> > > > > > > ) >>> > >>> > > > > > > >>> > >>> > > > > > > protected boolean chunkingEnabled = false; >>> > >>> > > > > > > >>> > >>> > > > > > > @FieldContext( >>> > >>> > > > > > > >>> > >>> > > > > > > doc = "Disables default behavior to block when message >>> > >>> > > > > > queue is >>> > >>> > > > > > > full" >>> > >>> > > > > > > >>> > >>> > > > > > > ) >>> > >>> > > > > > > >>> > >>> > > > > > > protected boolean blockIfQueueFullDisabled = false; >>> > >>> > > > > > > >>> > >>> > > > > > > @FieldContext( >>> > >>> > > > > > > >>> > >>> > > > > > > doc = "Default compression type" >>> > >>> > > > > > > >>> > >>> > > > > > > ) >>> > >>> > > > > > > >>> > >>> > > > > > > protected String compressionType = "LZ4"; >>> > >>> > > > > > > >>> > >>> > > > > > > @FieldContext( >>> > >>> > > > > > > >>> > >>> > > > > > > doc = "Default hashing scheme" >>> > >>> > > > > > > >>> > >>> > > > > > > ) >>> > >>> > > > > > > >>> > >>> > > > > > > protected String hashingScheme = "Murmur3_32Hash"; >>> > >>> > > > > > > >>> > >>> > > > > > > @FieldContext( >>> > >>> > > > > > > >>> > >>> > > > > > > doc = "Default hashing scheme" >>> > >>> > > > > > > >>> > >>> > > > > > > ) >>> > >>> > > > > > > >>> > >>> > > > > > > protected String messageRoutingMode = >>> "CustomPartition"; >>> > >>> > > > > > > >>> > >>> > > > > > > @FieldContext( >>> > >>> > > > > > > >>> > >>> > > > > > > doc = "Default max publish delay (in milliseconds) when >>> > >>> > > > > > message >>> > >>> > > > > > > batching is enabled" >>> > >>> > > > > > > >>> > >>> > > > > > > ) >>> > >>> > > > > > > >>> > >>> > > > > > > protected long batchingMaxPublishDelay = 10L; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > public ClusterFunctionProducerDefaults >>> > >>> buildProducerDefaults() >>> > >>> > > > > > > throws InvalidWorkerConfigDefaultException >>> > >>> > > > > > > { >>> > >>> > > > > > > >>> > >>> > > > > > > return new >>> > >>> > > > > > > >>> ClusterFunctionProducerDefaults(this.isBatchingDisabled() >>> > >>> > > > > > > , >>> > >>> > > > > > > >>> > >>> > > > > > > this.isChunkingEnabled(), >>> > >>> > > > > > > this.isBlockIfQueueFullDisabled(), >>> > >>> > > > > > > this.getCompressionType(), >>> > >>> > > > > > > >>> > >>> > > > > > > this.getHashingScheme(), >>> > >>> > > this.getMessageRoutingMode(), >>> > >>> > > > > > this >>> > >>> > > > > > > .getBatchingMaxPublishDelay()); >>> > >>> > > > > > > >>> > >>> > > > > > > } >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > public FunctionDefaultsConfig() { >>> > >>> > > > > > > >>> > >>> > > > > > > } >>> > >>> > > > > > > >>> > >>> > > > > > > } >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > The additional section in function_worker.yml will look >>> > like >>> > >>> this: >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > functionDefaults: >>> > >>> > > > > > > >>> > >>> > > > > > > batchingDisabled: true >>> > >>> > > > > > > >>> > >>> > > > > > > chunkingEnabled: false >>> > >>> > > > > > > >>> > >>> > > > > > > blockIfQueueFullDisabled: false >>> > >>> > > > > > > >>> > >>> > > > > > > batchingMaxPublishDelay: 12 >>> > >>> > > > > > > >>> > >>> > > > > > > compressionType: ZLIB >>> > >>> > > > > > > >>> > >>> > > > > > > hashingScheme: JavaStringHash >>> > >>> > > > > > > >>> > >>> > > > > > > messageRoutingMode: RoundRobinPartition >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > During function update, we can use a new boolean >>> option, >>> > >>> > > > > > > ignoreExistingFunctionDefaults, to toggle precedence: >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @Data >>> > >>> > > > > > > >>> > >>> > > > > > > @NoArgsConstructor >>> > >>> > > > > > > >>> > >>> > > > > > > @ApiModel(value = "UpdateOptions", description = >>> "Options >>> > >>> while >>> > >>> > > > updating >>> > >>> > > > > > > function defaults or the sink") >>> > >>> > > > > > > >>> > >>> > > > > > > public class UpdateOptions { >>> > >>> > > > > > > >>> > >>> > > > > > > @ApiModelProperty( >>> > >>> > > > > > > >>> > >>> > > > > > > value = "Whether or not to update the auth data", >>> > >>> > > > > > > >>> > >>> > > > > > > name = "update-auth-data") >>> > >>> > > > > > > >>> > >>> > > > > > > private boolean updateAuthData = false; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @ApiModelProperty( >>> > >>> > > > > > > >>> > >>> > > > > > > value="Whether or not to ignore any existing function >>> > >>> > > > > > > defaults", >>> > >>> > > > > > > >>> > >>> > > > > > > name ="ignore-existing-function-defaults") >>> > >>> > > > > > > >>> > >>> > > > > > > private boolean ignoreExistingFunctionDefaults = false; >>> > >>> > > > > > > >>> > >>> > > > > > > } >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > To ensure that function producer settings don't get >>> > modified >>> > >>> > > > > > unexpectedly >>> > >>> > > > > > > by an update, ignoreExistingFunctionDefaults = false by >>> > >>> default. >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > The updated ProducerConfig will contain the new >>> properties >>> > >>> and a >>> > >>> > > > method >>> > >>> > > > > > to >>> > >>> > > > > > > merge an incoming ProducerConfig with an existing >>> > >>> ProducerConfig. >>> > >>> > > (The >>> > >>> > > > > > > merged ProducerConfig will use the incoming >>> ProducerConfig >>> > >>> > > properties >>> > >>> > > > > > when >>> > >>> > > > > > > they exist and will use existing ProducerConfig >>> properties >>> > >>> > > otherwise.) >>> > >>> > > > > > To >>> > >>> > > > > > > ensure we don't need to force operators to provide >>> those >>> > >>> > > > ProducerConfig >>> > >>> > > > > > > properties with every create or update, they must be >>> > >>> nullable, not >>> > >>> > > > > > > primitive (e.g. Boolean instead of boolean.) >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > To support the additional properties, >>> FunctionConfigUtils >>> > >>> (and any >>> > >>> > > > > > similar >>> > >>> > > > > > > classes) will need to be modified to correctly >>> translate >>> > >>> between the >>> > >>> > > > > > > ProducerConfig and the protobuf ProducerSpec. >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > The updated ProducerSpec in Function.proto will look >>> like >>> > >>> this: >>> > >>> > > > > > > >>> > >>> > > > > > > message ProducerSpec { >>> > >>> > > > > > > >>> > >>> > > > > > > int32 maxPendingMessages = 1; >>> > >>> > > > > > > >>> > >>> > > > > > > int32 maxPendingMessagesAcrossPartitions = 2; >>> > >>> > > > > > > >>> > >>> > > > > > > bool useThreadLocalProducers = 3; >>> > >>> > > > > > > >>> > >>> > > > > > > CryptoSpec cryptoSpec = 4; >>> > >>> > > > > > > >>> > >>> > > > > > > string batchBuilder = 5; >>> > >>> > > > > > > >>> > >>> > > > > > > bool batchingDisabled = 6; >>> > >>> > > > > > > >>> > >>> > > > > > > bool chunkingEnabled = 7; >>> > >>> > > > > > > >>> > >>> > > > > > > bool blockIfQueueFullDisabled = 8; >>> > >>> > > > > > > >>> > >>> > > > > > > CompressionType compressionType = 9; >>> > >>> > > > > > > >>> > >>> > > > > > > HashingScheme hashingScheme = 10; >>> > >>> > > > > > > >>> > >>> > > > > > > MessageRoutingMode messageRoutingMode = 11; >>> > >>> > > > > > > >>> > >>> > > > > > > int64 batchingMaxPublishDelay = 12; >>> > >>> > > > > > > >>> > >>> > > > > > > } >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > To support these new parameters during local run and >>> for >>> > >>> > > convenience, >>> > >>> > > > we >>> > >>> > > > > > > should be able to specify them in the Admin CLI and in >>> > local >>> > >>> run >>> > >>> > > > modes. >>> > >>> > > > > > > >>> > >>> > > > > > > For local run, as function_worker.yml is not used, >>> these >>> > >>> parameters >>> > >>> > > > will >>> > >>> > > > > > > have defaults and be primitive values: >>> > >>> > > > > > > >>> > >>> > > > > > > @Parameter(names = "--clusterFunctionBatchingDisabled", >>> > >>> description >>> > >>> > > = >>> > >>> > > > > > > "Disable >>> > >>> > > > > > > the default message batching behavior for functions", >>> > hidden >>> > >>> = true) >>> > >>> > > > > > > >>> > >>> > > > > > > public boolean clusterFunctionBatchingDisabled = false; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @Parameter(names = "--clusterFunctionChunkingEnabled", >>> > >>> description = >>> > >>> > > > > > "The >>> > >>> > > > > > > default message chunking behavior for functions", >>> hidden = >>> > >>> true) >>> > >>> > > > > > > >>> > >>> > > > > > > public boolean clusterFunctionChunkingEnabled = false; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @Parameter(names = >>> > >>> "--clusterFunctionBlockIfQueueFullDisabled", >>> > >>> > > > > > description >>> > >>> > > > > > > = "Disable the default blocking behavior for functions >>> when >>> > >>> queue is >>> > >>> > > > > > > full", hidden >>> > >>> > > > > > > = true) >>> > >>> > > > > > > >>> > >>> > > > > > > public boolean clusterFunctionBlockIfQueueFullDisabled >>> = >>> > >>> false; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @Parameter(names = >>> > "--clusterFunctionCompressionTypeDefault", >>> > >>> > > > > > > description = "The >>> > >>> > > > > > > default Compression Type for functions", hidden = true) >>> > >>> > > > > > > >>> > >>> > > > > > > public String clusterFunctionCompressionTypeDefault = >>> > "LZ4"; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @Parameter(names = >>> "--clusterFunctionHashingSchemeDefault", >>> > >>> > > > description >>> > >>> > > > > > = >>> > >>> > > > > > > "The >>> > >>> > > > > > > default Hashing Scheme for functions", hidden = true) >>> > >>> > > > > > > >>> > >>> > > > > > > public String clusterFunctionHashingSchemeDefault = >>> > >>> > > "Murmur3_32Hash"; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @Parameter(names = >>> > >>> "--clusterFunctionMessageRoutingModeDefault", >>> > >>> > > > > > > description >>> > >>> > > > > > > = "The default Message Routing Mode for functions", >>> hidden >>> > = >>> > >>> true) >>> > >>> > > > > > > >>> > >>> > > > > > > public String clusterFunctionMessageRoutingModeDefault >>> = >>> > >>> > > > > > "CustomPartition"; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > @Parameter(names = >>> > >>> > > "--clusterFunctionBatchingMaxPublishDelayDefault", >>> > >>> > > > > > > description >>> > >>> > > > > > > = "The default max publish delay (in milliseconds) for >>> > >>> functions >>> > >>> > > when >>> > >>> > > > > > > message batching is enabled", hidden = true) >>> > >>> > > > > > > >>> > >>> > > > > > > public long >>> clusterFunctionBatchingMaxPublishDelayDefault = >>> > >>> 10L; >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > For Admin CLI, parameters can be provided in a similar >>> way, >>> > >>> but they >>> > >>> > > > > > will >>> > >>> > > > > > > not have defaults (i.e. Boolean instead of boolean) >>> since >>> > >>> parameters >>> > >>> > > > > > should >>> > >>> > > > > > > be null to ensure non-provided configs are obtained >>> from >>> > >>> > > WorkerConfig. >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > To handle the logic of selecting the property from an >>> > >>> incoming >>> > >>> > > > > > > producerConfig vs the producerDefaults object (from >>> > >>> WorkerConfig), >>> > >>> > > we >>> > >>> > > > > > can >>> > >>> > > > > > > use a FunctionDefaultsMediator to handle this. That >>> > >>> interface looks >>> > >>> > > > like >>> > >>> > > > > > > this and gives us flexibility to handle mediation for >>> > >>> specific >>> > >>> > > > > > properties >>> > >>> > > > > > > without needing to convert the entire object. >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > public interface FunctionDefaultsMediator { >>> > >>> > > > > > > >>> > >>> > > > > > > boolean isBatchingDisabled(); >>> > >>> > > > > > > >>> > >>> > > > > > > boolean isChunkingEnabled(); >>> > >>> > > > > > > >>> > >>> > > > > > > boolean isBlockIfQueueFullDisabled(); >>> > >>> > > > > > > >>> > >>> > > > > > > CompressionType getCompressionType(); >>> > >>> > > > > > > >>> > >>> > > > > > > Function.CompressionType getCompressionTypeProto(); >>> > >>> > > > > > > >>> > >>> > > > > > > HashingScheme getHashingScheme(); >>> > >>> > > > > > > >>> > >>> > > > > > > Function.HashingScheme getHashingSchemeProto(); >>> > >>> > > > > > > >>> > >>> > > > > > > MessageRoutingMode getMessageRoutingMode(); >>> > >>> > > > > > > >>> > >>> > > > > > > Function.MessageRoutingMode >>> getMessageRoutingModeProto(); >>> > >>> > > > > > > >>> > >>> > > > > > > Long getBatchingMaxPublishDelay(); >>> > >>> > > > > > > >>> > >>> > > > > > > } >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > (Alternatively, we could remove the Proto methods from >>> the >>> > >>> mediator >>> > >>> > > > and >>> > >>> > > > > > > just use FunctionConfigUtils.convert(..) to translate >>> > >>> between the >>> > >>> > > > > > protobuf >>> > >>> > > > > > > ProducerSpec and the Java ProducerConfig.) >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > *Testing* >>> > >>> > > > > > > >>> > >>> > > > > > > In terms of testing, many of this functionality will be >>> > >>> covered by >>> > >>> > > > > > existing >>> > >>> > > > > > > tests in the pulsar-functions module. However, we need >>> to >>> > >>> test each >>> > >>> > > of >>> > >>> > > > > > the >>> > >>> > > > > > > new classes, as well as these behaviors: >>> > >>> > > > > > > >>> > >>> > > > > > > - functionRegistration (with different provided >>> > >>> ProducerConfig >>> > >>> > > > > > > properties) >>> > >>> > > > > > > - functionUpdate (for >>> ignoreExistingFunctionDefaults=true >>> > and >>> > >>> > > > > > > ignoreExistingFunctionDefaults=false) >>> > >>> > > > > > > - loading function_worker.yml into WorkerConfig >>> > >>> > > > > > > - throwing exceptions correctly when invalid >>> parameters are >>> > >>> > > > provided >>> > >>> > > > > > > - converting between ProducerSpec and ProducerConfig >>> > >>> > > > > > > - correctly handling precedence when computing the >>> final >>> > >>> > > > > > ProducerConfig >>> > >>> > > > > > > values >>> > >>> > > > > > > - creating producers >>> > >>> > > > > > > - ensuring functionality is consistent across runtimes >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > I'd like feedback on this proposal. >>> > >>> > > > > > > >>> > >>> > > > > > > Devin G. Bost >>> > >>> > > > > > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > >>> >>> > >> >>> > >>> >>