All the tests in my personal Github CI passed except for ones in the Flaky group, so I created an Apache Pulsar PR: https://github.com/apache/pulsar/pull/10154 (Again, this is for Part 1 of the changes.) I think that code is ready for review.
Devin G. Bost On Tue, Apr 6, 2021 at 12:37 AM Devin Bost <devin.b...@gmail.com> wrote: > I think I was able to resolve that issue. Fortunately, only > FunctionResultRouter had the singleton behavior, so resolving that was > straightforward. The current PR for part 1 is here (in my personal CI): > https://github.com/devinbost/pulsar/pull/5/files > Once the tests look good, I'll submit a PR to Apache Pulsar. > > Devin G. Bost > > > On Mon, Apr 5, 2021 at 8:50 PM Devin Bost <devin.b...@gmail.com> wrote: > >> It appears that the reason the router classes are singletons is to ensure >> the clock behavior is consistent across producers. >> I wonder if we might be able to avoid creating a breaking change by >> refactoring out the clock into a singleton that could be shared across the >> routers, which we could then make not singletons. >> That would allow the routers the ability to vary behavior through their >> constructors (thus avoiding a breaking change) and eliminate the race >> condition that would occur any time two producers are created with the same >> router class and different batching behavior. >> >> Devin G. Bost >> >> >> On Mon, Apr 5, 2021 at 6:01 PM Devin Bost <devin.b...@gmail.com> wrote: >> >>> Jerry, >>> >>> Thanks for the information and for the links! >>> Thanks also for the suggestion to split the PR into two parts. That will >>> make it easier to get some of this work completed in time for the 2.8.0 >>> release. >>> >>> While working on the first part, I discovered something that needs >>> community attention. I discovered that in order to create the producer >>> (e.g. >>> https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L117), >>> we get the MessageRouter from a singleton that has isBatchingEnabled in its >>> constructor. >>> [image: image.png] >>> The problem is that FunctionResultRouter is a shared instance, so if one >>> function changes the router's value of isBatchingEnabled, it will create a >>> race condition with other functions that share the same router. It appears >>> that the reason the router is a singleton is to ensure that clock behavior >>> is consistent. >>> >>> Unless someone has a better idea, it looks like it might be necessary to >>> refactor isBatchingEnabled ( >>> https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java#L46) >>> from its constructor into a method parameter on the router. >>> >>> However, moving isBatchingEnabled to a method parameter is a breaking >>> change on the client interface: >>> https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageRouter.java#L56 >>> >>> I don't want to introduce breaking changes, but I'm not sure how else to >>> ensure that the message router behaves correctly across function instances ( >>> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java#L82) >>> when some functions may have batching enabled and some may not. >>> >>> Looking for ideas/feedback. >>> >>> Devin G. Bost >>> >>> >>> On Mon, Apr 5, 2021 at 1:11 PM Jerry Peng <jerry.boyang.p...@gmail.com> >>> wrote: >>> >>>> > >>>> > The interceptor would be loaded at function registration. >>>> >>>> >>>> The interceptor classes should be loaded at broker/worker startup time. >>>> >>>> In order for an >>>> > interceptor to be picked up, a jar would need to be compiled with the >>>> class >>>> > that implements the interface, and the jar would need to be deployed >>>> into a >>>> > directory that matches the directory provided somewhere. (Would this >>>> > directory be specified in function_worker.yml? Or, would we need to >>>> reuse >>>> > one of the existing interceptor directories and filter classes at >>>> > runtime?) >>>> > >>>> >>>> The JAR with classes for the interceptor can just be placed in the lib/ >>>> directory of the pulsar project. An additional config may be needed to >>>> be >>>> added to function_worker.yml so the classname of the interceptor impl >>>> to be >>>> provided. >>>> >>>> In order for changes to the FunctionConfig values to reach where the >>>> > Function producers are created, we'd still need to add the producer >>>> > properties to the Function.proto and FunctionConfig.ProducerConfig >>>> > definitions. We'd also still need to modify CLI parameters to ensure >>>> these >>>> > values could be modified on a per-function basis. >>>> > >>>> >>>> For you use case of disabling batching, you will still need to add an >>>> additional field in the Function.proto so that it can be configurable. >>>> I >>>> would recommend you doing that work separately since that work is >>>> orthogonal to interceptors. >>>> >>>> When a function is registered, instead of retrieving cluster-wide >>>> function >>>> > producer defaults from WorkerConfig, it would load the interceptor, >>>> which >>>> > would modify the FunctionConfig's ProducerConfig's property values to >>>> > provide the cluster-wide defaults for any value that wasn't provided >>>> by the >>>> > REST call. >>>> > >>>> >>>> Yes. >>>> >>>> If an interceptor wasn't provided, during conversion of the >>>> ProducerConfig >>>> > to the protobuf ProducerSpec, the protobuf defaults will take effect >>>> (since >>>> > protobuf doesn't allow null primitives), which will then be picked up >>>> when >>>> > the producer is created. >>>> > >>>> >>>> >>>> If an interceptor impl is not provided, the behavior of the APIs should >>>> be >>>> the same as the current behavior. >>>> >>>> >>>> Back in the Streamlio, I actually created an interceptor mechanism. >>>> >>>> Please take at this old PR: >>>> >>>> https://github.com/apache/pulsar/pull/7159 >>>> >>>> or more specifically what is done for functions here. >>>> >>>> >>>> https://github.com/apache/pulsar/pull/7159/files#diff-427711b7c1c3b9f17cd1c02bb9ceb942af454676dd97902a43e910f645febc6d >>>> >>>> >>>> Perhaps you can help revive this work! >>>> >>>> Best, >>>> >>>> Jerry >>>> >>>> On Mon, Apr 5, 2021 at 11:46 AM Devin Bost <devin.b...@gmail.com> >>>> wrote: >>>> >>>> > Here's my understanding of what I'd need to do to implement the >>>> interceptor >>>> > approach. >>>> > The interceptor would be loaded at function registration. In order >>>> for an >>>> > interceptor to be picked up, a jar would need to be compiled with the >>>> class >>>> > that implements the interface, and the jar would need to be deployed >>>> into a >>>> > directory that matches the directory provided somewhere. (Would this >>>> > directory be specified in function_worker.yml? Or, would we need to >>>> reuse >>>> > one of the existing interceptor directories and filter classes at >>>> > runtime?) >>>> > In order for changes to the FunctionConfig values to reach where the >>>> > Function producers are created, we'd still need to add the producer >>>> > properties to the Function.proto and FunctionConfig.ProducerConfig >>>> > definitions. We'd also still need to modify CLI parameters to ensure >>>> these >>>> > values could be modified on a per-function basis. >>>> > When a function is registered, instead of retrieving cluster-wide >>>> function >>>> > producer defaults from WorkerConfig, it would load the interceptor, >>>> which >>>> > would modify the FunctionConfig's ProducerConfig's property values to >>>> > provide the cluster-wide defaults for any value that wasn't provided >>>> by the >>>> > REST call. >>>> > If an interceptor wasn't provided, during conversion of the >>>> ProducerConfig >>>> > to the protobuf ProducerSpec, the protobuf defaults will take effect >>>> (since >>>> > protobuf doesn't allow null primitives), which will then be picked up >>>> when >>>> > the producer is created. >>>> > >>>> > Is that right? >>>> > >>>> > Devin G. Bost >>>> > >>>> > >>>> > On Mon, Apr 5, 2021 at 11:56 AM Devin Bost <devin.b...@gmail.com> >>>> wrote: >>>> > >>>> > > If we use the interceptor approach, does that mean a cluster-admin >>>> would >>>> > > need to write custom code in order to override the function producer >>>> > > settings? I worry about adding additional burden to cluster >>>> > administration. >>>> > > >>>> > > -- >>>> > > Devin G. Bost >>>> > > >>>> > > On Sat, Apr 3, 2021, 8:26 AM Devin Bost <devin.b...@gmail.com> >>>> wrote: >>>> > > >>>> > >> Hi Rui, >>>> > >> >>>> > >> Thanks for the feedback. My understanding about the interceptor is >>>> that >>>> > >> it's not something that would be touched by users but would only be >>>> > >> modified by cluster admins. So, I think we'd still need to include >>>> the >>>> > >> parameters the parameters on the ProducerConfig. >>>> > >> I already have the other approach coded, so changing the design >>>> will >>>> > >> require some rework. >>>> > >> I'll look at that example you mentioned. >>>> > >> >>>> > >> -- >>>> > >> Devin G. Bost >>>> > >> >>>> > >> On Fri, Apr 2, 2021, 10:18 PM Rui Fu <f...@rui.sh> wrote: >>>> > >> >>>> > >>> Hi Devin, >>>> > >>> >>>> > >>> Great proposal for giving function such useful feature, and >>>> thanks for >>>> > >>> your detailed writing. After going through the history of this >>>> > discussion, >>>> > >>> I would like to have +1 with Jerry’s suggestion. With the >>>> interceptor, >>>> > the >>>> > >>> admins and users may have the maximum flexibility to customize >>>> their >>>> > >>> producer configs. It may also reduce the parameters we put into >>>> the >>>> > configs >>>> > >>> and pulsar-admin. Also, we have some interceptors with Pulsar >>>> Functions >>>> > >>> already, like the `RuntimeCustomizer` interface, so it is not a >>>> new >>>> > tech >>>> > >>> that users hard to adopt. >>>> > >>> >>>> > >>> Besides, I would like to suggest that to cover these changes with >>>> > Python >>>> > >>> & Go runtime as well if we will have some new fields to >>>> > `Function.proto`. >>>> > >>> >>>> > >>> Best, >>>> > >>> >>>> > >>> Rui >>>> > >>> 在 2021年4月2日 +0800 AM10:13,Devin Bost <devin.b...@gmail.com>,写道: >>>> > >>> > What would be some of the additional benefits of using the >>>> > interceptor >>>> > >>> > approach? >>>> > >>> > >>>> > >>> > -- >>>> > >>> > Devin G. Bost >>>> > >>> > >>>> > >>> > On Thu, Apr 1, 2021, 6:03 PM Jerry Peng < >>>> jerry.boyang.p...@gmail.com >>>> > > >>>> > >>> wrote: >>>> > >>> > >>>> > >>> > > Devin, >>>> > >>> > > >>>> > >>> > > Interceptors are located on the server side (broker/worker). >>>> > Cluster >>>> > >>> > > admins would create the plugins based on the interfaces I >>>> described >>>> > >>> and >>>> > >>> > > install them on the server side. Regular function developers >>>> will >>>> > not >>>> > >>> > > implement or interact directly with the interceptor. >>>> > >>> > > >>>> > >>> > > > Would it be better to just ignore WorkerConfig defaults >>>> during >>>> > >>> function >>>> > >>> > > update? >>>> > >>> > > >>>> > >>> > > Yes. >>>> > >>> > > >>>> > >>> > > If the cluster admin changes the function config defaults and >>>> wants >>>> > >>> > > existing functions to utilize those configs that have >>>> changed, the >>>> > >>> admin >>>> > >>> > > can just update those configs of the functions through the >>>> regular >>>> > >>> update >>>> > >>> > > mechanism for functions we have today. >>>> > >>> > > >>>> > >>> > > >>>> > >>> > > >>>> > >>> > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost < >>>> devin.b...@gmail.com> >>>> > >>> wrote: >>>> > >>> > > >>>> > >>> > > > Would it be better to just ignore WorkerConfig defaults >>>> during >>>> > >>> function >>>> > >>> > > > update? We could still update the function producer behavior >>>> > >>> through the >>>> > >>> > > > producerConfig passed as a REST parameter, but we'd avoid >>>> the >>>> > edge >>>> > >>> case I >>>> > >>> > > > mentioned. >>>> > >>> > > > >>>> > >>> > > > Devin G. Bost >>>> > >>> > > > >>>> > >>> > > > >>>> > >>> > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost < >>>> devin.b...@gmail.com> >>>> > >>> wrote: >>>> > >>> > > > >>>> > >>> > > > > I just remembered an edge case that motivated me to >>>> create that >>>> > >>> > > > additional >>>> > >>> > > > > flow. >>>> > >>> > > > > In my PR, if any producerConfig values are null at >>>> registration >>>> > >>> or >>>> > >>> > > > update, >>>> > >>> > > > > we get those values from WorkerConfig instead. >>>> > >>> > > > > However, when a user runs an update (on a function) for >>>> the >>>> > >>> first time >>>> > >>> > > > > after upgrading to the version of Pulsar with this >>>> feature, if >>>> > >>> someone >>>> > >>> > > > has >>>> > >>> > > > > modified the function_worker.yml file (perhaps without >>>> their >>>> > >>> > > knowledge), >>>> > >>> > > > > those defaults from function_worker.yml will get picked up >>>> > >>> because the >>>> > >>> > > > > existing producerConfig will have null set for those >>>> values >>>> > >>> (because >>>> > >>> > > the >>>> > >>> > > > > function was registered prior to the existence of this >>>> > feature.) >>>> > >>> So, >>>> > >>> > > > > the user could be trying to update something unrelated, >>>> not >>>> > >>> realizing >>>> > >>> > > > that >>>> > >>> > > > > their function is going to have its producerConfig >>>> updated by >>>> > the >>>> > >>> > > cluster >>>> > >>> > > > > defaults. Most users won't be affected by this, but it >>>> could be >>>> > >>> tricky >>>> > >>> > > to >>>> > >>> > > > > diagnose for the few who would get impacted by it. (Is >>>> this >>>> > >>> description >>>> > >>> > > > any >>>> > >>> > > > > clearer? If not, it might be clearer if I diagram it.) >>>> > >>> > > > > >>>> > >>> > > > > Devin G. Bost >>>> > >>> > > > > >>>> > >>> > > > > >>>> > >>> > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng < >>>> > >>> jerry.boyang.p...@gmail.com >>>> > >>> > > > >>>> > >>> > > > > wrote: >>>> > >>> > > > > >>>> > >>> > > > > > Hi Devin, >>>> > >>> > > > > > >>>> > >>> > > > > > I understand the usefulness of giving cluster admins the >>>> > >>> ability to >>>> > >>> > > > > > specify >>>> > >>> > > > > > some defaults for functions at a cluster level. I think >>>> the >>>> > >>> right >>>> > >>> > > time >>>> > >>> > > > to >>>> > >>> > > > > > apply those defaults is during function registration >>>> time. I >>>> > >>> am not >>>> > >>> > > > sure >>>> > >>> > > > > > I >>>> > >>> > > > > > understand the usefulness of what you are proposing we >>>> do >>>> > >>> during >>>> > >>> > > update >>>> > >>> > > > > > time. If you would like to change a config of an already >>>> > >>> running >>>> > >>> > > > function >>>> > >>> > > > > > why not just update it with config changes that are >>>> needed. I >>>> > >>> am not >>>> > >>> > > > sure >>>> > >>> > > > > > you need the additional workflow you proposed for the >>>> update. >>>> > >>> > > > > > >>>> > >>> > > > > > Another idea to satisfy your use case is that instead of >>>> > >>> leveraging a >>>> > >>> > > > > > default config mechanism perhaps we should take a look >>>> at >>>> > >>> implementing >>>> > >>> > > > > > interceptors for the function/sources/sink API calls. We >>>> > >>> already have >>>> > >>> > > > > > interceptors implemented for many other pulsar >>>> > functionalities. >>>> > >>> > > Perhaps >>>> > >>> > > > we >>>> > >>> > > > > > should do it for Pulsar Functions APIs as well. With >>>> > >>> interceptors you >>>> > >>> > > > > > will >>>> > >>> > > > > > be able to intercept all API operations to functions and >>>> > >>> customize the >>>> > >>> > > > > > requests however you would like. >>>> > >>> > > > > > >>>> > >>> > > > > > We can have a interface like: >>>> > >>> > > > > > >>>> > >>> > > > > > interface FunctionInterceptors { >>>> > >>> > > > > > >>>> > >>> > > > > > void regsterFunctionInterceptor (String tenant, String >>>> > >>> namespace, >>>> > >>> > > String >>>> > >>> > > > > > functionName, FunctionConfig functionConfig...); >>>> > >>> > > > > > >>>> > >>> > > > > > ... >>>> > >>> > > > > > >>>> > >>> > > > > > } >>>> > >>> > > > > > >>>> > >>> > > > > > for developers to implement. During the beginning of >>>> every >>>> > >>> Function >>>> > >>> > > API >>>> > >>> > > > > > call the corresponding interceptor method gets called. >>>> For >>>> > >>> example, >>>> > >>> > > the >>>> > >>> > > > > > "regsterFunctionInterceptor()" method I provided above >>>> will >>>> > be >>>> > >>> called >>>> > >>> > > > > > towards the beginning of the registerFunction and allow >>>> you >>>> > to >>>> > >>> > > customize >>>> > >>> > > > > > the function config however you want. >>>> > >>> > > > > > >>>> > >>> > > > > > Best, >>>> > >>> > > > > > >>>> > >>> > > > > > Jerry >>>> > >>> > > > > > >>>> > >>> > > > > > >>>> > >>> > > > > > >>>> > >>> > > > > > On Thu, Apr 1, 2021 at 2:08 PM Devin Bost < >>>> > >>> devin.b...@gmail.com> >>>> > >>> > > wrote: >>>> > >>> > > > > > >>>> > >>> > > > > > > *Cluster-Wide and Function-Specific Producer Defaults* >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > * - Status: Proposal- Author: Devin Bost (with >>>> guidance >>>> > from >>>> > >>> Jerry >>>> > >>> > > > > > Peng)- >>>> > >>> > > > > > > Pull Request: >>>> https://github.com/apache/pulsar/pull/9987 >>>> > >>> > > > > > > <https://github.com/apache/pulsar/pull/9987>- >>>> Mailing List >>>> > >>> > > > discussion: >>>> > >>> > > > > > - >>>> > >>> > > > > > > Release: 2.8.0* >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > *Motivation* >>>> > >>> > > > > > > >>>> > >>> > > > > > > Pulsar currently provides no way to allow users or >>>> > operators >>>> > >>> to >>>> > >>> > > change >>>> > >>> > > > > > the >>>> > >>> > > > > > > producer behavior of Pulsar functions. For >>>> organizations >>>> > >>> that are >>>> > >>> > > > making >>>> > >>> > > > > > > heavy use of functions, the ability to tune messaging >>>> > >>> behavior of >>>> > >>> > > > > > functions >>>> > >>> > > > > > > is greatly desired. Moreover, current function >>>> messaging >>>> > >>> defaults >>>> > >>> > > > > > appear to >>>> > >>> > > > > > > be causing issues for certain workloads. (For >>>> example, some >>>> > >>> bugs >>>> > >>> > > > appear >>>> > >>> > > > > > to >>>> > >>> > > > > > > be related to having batching enabled. See #6054.) >>>> Enabling >>>> > >>> > > operators >>>> > >>> > > > to >>>> > >>> > > > > > > modify these settings will help Pulsar cluster admins >>>> > >>> workaround >>>> > >>> > > > issues >>>> > >>> > > > > > to >>>> > >>> > > > > > > enable them to upgrade legacy versions of Pulsar to >>>> more >>>> > >>> recent >>>> > >>> > > > > > versions. >>>> > >>> > > > > > > Moreover, enabling greater tuning of these settings >>>> can >>>> > >>> provide more >>>> > >>> > > > > > > opportunity for performance optimization in production >>>> > >>> environments. >>>> > >>> > > > We >>>> > >>> > > > > > > need the ability to modify these function producer >>>> > settings: >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > - batchingEnabled >>>> > >>> > > > > > > - chunkingEnabled >>>> > >>> > > > > > > - blockIfQueueFull >>>> > >>> > > > > > > - compressionType >>>> > >>> > > > > > > - hashingScheme >>>> > >>> > > > > > > - messageRoutingMode >>>> > >>> > > > > > > - batchingMaxPublishDelay >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > *Implementation Strategy* >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > Ideally, we need a way to set default message producer >>>> > >>> behavior (for >>>> > >>> > > > > > > functions) on a cluster-wide level. However, >>>> operators may >>>> > >>> want >>>> > >>> > > > specific >>>> > >>> > > > > > > settings to be used for certain functions. (We can't >>>> assume >>>> > >>> that >>>> > >>> > > > > > everyone >>>> > >>> > > > > > > will want all functions in a cluster to have the same >>>> > >>> producer >>>> > >>> > > > > > settings.) >>>> > >>> > > > > > > When an operator changes the cluster-wide producer >>>> > defaults, >>>> > >>> they >>>> > >>> > > > need a >>>> > >>> > > > > > > way to be able to roll out the changes to functions >>>> without >>>> > >>> > > > > > significantly >>>> > >>> > > > > > > disrupting messaging flows in production >>>> environments. They >>>> > >>> also >>>> > >>> > > need >>>> > >>> > > > a >>>> > >>> > > > > > > simple way to rollback changes in case a change to >>>> function >>>> > >>> producer >>>> > >>> > > > > > > settings results in undesired behavior. However, not >>>> all >>>> > >>> users will >>>> > >>> > > > want >>>> > >>> > > > > > > function updates to replace existing function producer >>>> > >>> settings, >>>> > >>> > > > > > especially >>>> > >>> > > > > > > for functions that have been given custom producer >>>> settings >>>> > >>> (at the >>>> > >>> > > > > > > per-function level.) Due to this difference in use >>>> cases, >>>> > >>> there >>>> > >>> > > needs >>>> > >>> > > > > > to be >>>> > >>> > > > > > > a way to allow an operator to specify what update >>>> behavior >>>> > >>> they >>>> > >>> > > want. >>>> > >>> > > > > > For a >>>> > >>> > > > > > > given update, the user should be able to specify if: >>>> > >>> > > > > > > >>>> > >>> > > > > > > 1. Existing function producer settings will be >>>> replaced by >>>> > >>> > > > > > cluster-wide >>>> > >>> > > > > > > defaults (unless overridden by producer settings >>>> provided >>>> > in >>>> > >>> the >>>> > >>> > > > > > > request), >>>> > >>> > > > > > > or >>>> > >>> > > > > > > 2. Existing function producer settings will *not* be >>>> > >>> replaced by >>>> > >>> > > > > > > cluster-wide defaults (unless overridden by producer >>>> > settings >>>> > >>> > > > > > provided >>>> > >>> > > > > > > in >>>> > >>> > > > > > > the request). >>>> > >>> > > > > > > >>>> > >>> > > > > > > So, the two orders of precedence are: >>>> > >>> > > > > > > >>>> > >>> > > > > > > 1. newProducerConfig > cluster-wide defaults > >>>> > >>> > > > existingProducerConfig >>>> > >>> > > > > > > 2. newProducerConfig > existingProducerConfig > >>>> > cluster-wide >>>> > >>> > > > defaults >>>> > >>> > > > > > > >>>> > >>> > > > > > > We can add a boolean property to UpdateOptions to >>>> allow the >>>> > >>> > > precedence >>>> > >>> > > > > > to >>>> > >>> > > > > > > be specified. Since we don't want to introduce >>>> unexpected >>>> > >>> side >>>> > >>> > > effects >>>> > >>> > > > > > > during an update, the default precedence should >>>> prefer the >>>> > >>> > > > > > > existingProducerConfig over cluster-wide defaults. >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > To ensure that the desired producer settings (after >>>> > >>> calculating the >>>> > >>> > > > > > > settings from defaults + overrides) are available when >>>> > >>> creating the >>>> > >>> > > > > > > producers (on any cluster), we need to add these >>>> producer >>>> > >>> settings >>>> > >>> > > to >>>> > >>> > > > > > the >>>> > >>> > > > > > > function protobuf definition. Also, to ensure that >>>> settings >>>> > >>> can be >>>> > >>> > > > > > properly >>>> > >>> > > > > > > validated at the time they are submitted, we need >>>> > validation >>>> > >>> in two >>>> > >>> > > > > > places: >>>> > >>> > > > > > > >>>> > >>> > > > > > > 1. When cluster-wide configs are loaded upon broker >>>> startup >>>> > >>> > > > > > > 2. When a function is registered or updated. >>>> > >>> > > > > > > >>>> > >>> > > > > > > As it would be impractical to check every registered >>>> > function >>>> > >>> > > > definition >>>> > >>> > > > > > > during broker startup, most of the validation needs to >>>> > occur >>>> > >>> during >>>> > >>> > > > > > > function registration or update. However, during >>>> broker >>>> > >>> startup, we >>>> > >>> > > > can >>>> > >>> > > > > > at >>>> > >>> > > > > > > least validate that the configurations exist and >>>> throw an >>>> > >>> exception >>>> > >>> > > if >>>> > >>> > > > > > they >>>> > >>> > > > > > > are invalid. >>>> > >>> > > > > > > >>>> > >>> > > > > > > As any configuration exceptions thrown when the >>>> producer is >>>> > >>> created >>>> > >>> > > > > > would >>>> > >>> > > > > > > bubble up to the client, we need to ensure that the >>>> final >>>> > >>> > > > configurations >>>> > >>> > > > > > > are valid when a function is registered or updated to >>>> > ensure >>>> > >>> there >>>> > >>> > > are >>>> > >>> > > > > > > exceptions when the producer is created. This requires >>>> > >>> computing the >>>> > >>> > > > > > > producer settings (according to the desired >>>> precedence) >>>> > >>> during >>>> > >>> > > > function >>>> > >>> > > > > > > registration or update. >>>> > >>> > > > > > > >>>> > >>> > > > > > > When a function is registered, existing producer >>>> settings >>>> > do >>>> > >>> not >>>> > >>> > > exist >>>> > >>> > > > > > for >>>> > >>> > > > > > > it, so we simply need to get cluster-wide defaults and >>>> > >>> override them >>>> > >>> > > > > > with >>>> > >>> > > > > > > any specific producer settings provided in the >>>> > >>> FunctionConfig's >>>> > >>> > > > > > > ProducerConfig (submitted as a REST parameter.) After >>>> the >>>> > >>> initial >>>> > >>> > > > > > > registration, those computed settings will then be >>>> > persisted >>>> > >>> in >>>> > >>> > > > > > bookkeeper >>>> > >>> > > > > > > with the function definition. >>>> > >>> > > > > > > >>>> > >>> > > > > > > When a function is updated, we also need to address >>>> the >>>> > >>> existing >>>> > >>> > > > > > > producerConfig, and for this, we need to check the >>>> desired >>>> > >>> override >>>> > >>> > > > > > > precedence (as explained above) when computing the >>>> final >>>> > >>> producer >>>> > >>> > > > > > settings >>>> > >>> > > > > > > that will be persisted with the function. >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > *Modifications* >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > As WorkerConfig is readily available when functions >>>> are >>>> > >>> registered >>>> > >>> > > and >>>> > >>> > > > > > > updated, we can put new settings into >>>> function_worker.yml >>>> > >>> and can >>>> > >>> > > load >>>> > >>> > > > > > and >>>> > >>> > > > > > > validate them via WorkerConfig.load(..). To prevent >>>> > >>> introducing >>>> > >>> > > > breaking >>>> > >>> > > > > > > changes, parameters will have defaults assigned in >>>> > >>> WorkerConfig that >>>> > >>> > > > > > > conform to current expected behavior, like this: >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Data >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Accessors(chain = true) >>>> > >>> > > > > > > >>>> > >>> > > > > > > public class FunctionDefaultsConfig implements >>>> > Serializable { >>>> > >>> > > > > > > >>>> > >>> > > > > > > @FieldContext( >>>> > >>> > > > > > > >>>> > >>> > > > > > > doc = "Disables default message batching between >>>> > >>> > > > functions" >>>> > >>> > > > > > > >>>> > >>> > > > > > > ) >>>> > >>> > > > > > > >>>> > >>> > > > > > > protected boolean batchingDisabled = false; >>>> > >>> > > > > > > >>>> > >>> > > > > > > @FieldContext( >>>> > >>> > > > > > > >>>> > >>> > > > > > > doc = "Enables default message chunking between >>>> > >>> > > functions" >>>> > >>> > > > > > > >>>> > >>> > > > > > > ) >>>> > >>> > > > > > > >>>> > >>> > > > > > > protected boolean chunkingEnabled = false; >>>> > >>> > > > > > > >>>> > >>> > > > > > > @FieldContext( >>>> > >>> > > > > > > >>>> > >>> > > > > > > doc = "Disables default behavior to block when message >>>> > >>> > > > > > queue is >>>> > >>> > > > > > > full" >>>> > >>> > > > > > > >>>> > >>> > > > > > > ) >>>> > >>> > > > > > > >>>> > >>> > > > > > > protected boolean blockIfQueueFullDisabled = false; >>>> > >>> > > > > > > >>>> > >>> > > > > > > @FieldContext( >>>> > >>> > > > > > > >>>> > >>> > > > > > > doc = "Default compression type" >>>> > >>> > > > > > > >>>> > >>> > > > > > > ) >>>> > >>> > > > > > > >>>> > >>> > > > > > > protected String compressionType = "LZ4"; >>>> > >>> > > > > > > >>>> > >>> > > > > > > @FieldContext( >>>> > >>> > > > > > > >>>> > >>> > > > > > > doc = "Default hashing scheme" >>>> > >>> > > > > > > >>>> > >>> > > > > > > ) >>>> > >>> > > > > > > >>>> > >>> > > > > > > protected String hashingScheme = "Murmur3_32Hash"; >>>> > >>> > > > > > > >>>> > >>> > > > > > > @FieldContext( >>>> > >>> > > > > > > >>>> > >>> > > > > > > doc = "Default hashing scheme" >>>> > >>> > > > > > > >>>> > >>> > > > > > > ) >>>> > >>> > > > > > > >>>> > >>> > > > > > > protected String messageRoutingMode = >>>> "CustomPartition"; >>>> > >>> > > > > > > >>>> > >>> > > > > > > @FieldContext( >>>> > >>> > > > > > > >>>> > >>> > > > > > > doc = "Default max publish delay (in milliseconds) >>>> when >>>> > >>> > > > > > message >>>> > >>> > > > > > > batching is enabled" >>>> > >>> > > > > > > >>>> > >>> > > > > > > ) >>>> > >>> > > > > > > >>>> > >>> > > > > > > protected long batchingMaxPublishDelay = 10L; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > public ClusterFunctionProducerDefaults >>>> > >>> buildProducerDefaults() >>>> > >>> > > > > > > throws InvalidWorkerConfigDefaultException >>>> > >>> > > > > > > { >>>> > >>> > > > > > > >>>> > >>> > > > > > > return new >>>> > >>> > > > > > > >>>> ClusterFunctionProducerDefaults(this.isBatchingDisabled() >>>> > >>> > > > > > > , >>>> > >>> > > > > > > >>>> > >>> > > > > > > this.isChunkingEnabled(), >>>> > >>> > > > > > > this.isBlockIfQueueFullDisabled(), >>>> > >>> > > > > > > this.getCompressionType(), >>>> > >>> > > > > > > >>>> > >>> > > > > > > this.getHashingScheme(), >>>> > >>> > > this.getMessageRoutingMode(), >>>> > >>> > > > > > this >>>> > >>> > > > > > > .getBatchingMaxPublishDelay()); >>>> > >>> > > > > > > >>>> > >>> > > > > > > } >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > public FunctionDefaultsConfig() { >>>> > >>> > > > > > > >>>> > >>> > > > > > > } >>>> > >>> > > > > > > >>>> > >>> > > > > > > } >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > The additional section in function_worker.yml will >>>> look >>>> > like >>>> > >>> this: >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > functionDefaults: >>>> > >>> > > > > > > >>>> > >>> > > > > > > batchingDisabled: true >>>> > >>> > > > > > > >>>> > >>> > > > > > > chunkingEnabled: false >>>> > >>> > > > > > > >>>> > >>> > > > > > > blockIfQueueFullDisabled: false >>>> > >>> > > > > > > >>>> > >>> > > > > > > batchingMaxPublishDelay: 12 >>>> > >>> > > > > > > >>>> > >>> > > > > > > compressionType: ZLIB >>>> > >>> > > > > > > >>>> > >>> > > > > > > hashingScheme: JavaStringHash >>>> > >>> > > > > > > >>>> > >>> > > > > > > messageRoutingMode: RoundRobinPartition >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > During function update, we can use a new boolean >>>> option, >>>> > >>> > > > > > > ignoreExistingFunctionDefaults, to toggle precedence: >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Data >>>> > >>> > > > > > > >>>> > >>> > > > > > > @NoArgsConstructor >>>> > >>> > > > > > > >>>> > >>> > > > > > > @ApiModel(value = "UpdateOptions", description = >>>> "Options >>>> > >>> while >>>> > >>> > > > updating >>>> > >>> > > > > > > function defaults or the sink") >>>> > >>> > > > > > > >>>> > >>> > > > > > > public class UpdateOptions { >>>> > >>> > > > > > > >>>> > >>> > > > > > > @ApiModelProperty( >>>> > >>> > > > > > > >>>> > >>> > > > > > > value = "Whether or not to update the auth data", >>>> > >>> > > > > > > >>>> > >>> > > > > > > name = "update-auth-data") >>>> > >>> > > > > > > >>>> > >>> > > > > > > private boolean updateAuthData = false; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @ApiModelProperty( >>>> > >>> > > > > > > >>>> > >>> > > > > > > value="Whether or not to ignore any existing function >>>> > >>> > > > > > > defaults", >>>> > >>> > > > > > > >>>> > >>> > > > > > > name ="ignore-existing-function-defaults") >>>> > >>> > > > > > > >>>> > >>> > > > > > > private boolean ignoreExistingFunctionDefaults = >>>> false; >>>> > >>> > > > > > > >>>> > >>> > > > > > > } >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > To ensure that function producer settings don't get >>>> > modified >>>> > >>> > > > > > unexpectedly >>>> > >>> > > > > > > by an update, ignoreExistingFunctionDefaults = false >>>> by >>>> > >>> default. >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > The updated ProducerConfig will contain the new >>>> properties >>>> > >>> and a >>>> > >>> > > > method >>>> > >>> > > > > > to >>>> > >>> > > > > > > merge an incoming ProducerConfig with an existing >>>> > >>> ProducerConfig. >>>> > >>> > > (The >>>> > >>> > > > > > > merged ProducerConfig will use the incoming >>>> ProducerConfig >>>> > >>> > > properties >>>> > >>> > > > > > when >>>> > >>> > > > > > > they exist and will use existing ProducerConfig >>>> properties >>>> > >>> > > otherwise.) >>>> > >>> > > > > > To >>>> > >>> > > > > > > ensure we don't need to force operators to provide >>>> those >>>> > >>> > > > ProducerConfig >>>> > >>> > > > > > > properties with every create or update, they must be >>>> > >>> nullable, not >>>> > >>> > > > > > > primitive (e.g. Boolean instead of boolean.) >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > To support the additional properties, >>>> FunctionConfigUtils >>>> > >>> (and any >>>> > >>> > > > > > similar >>>> > >>> > > > > > > classes) will need to be modified to correctly >>>> translate >>>> > >>> between the >>>> > >>> > > > > > > ProducerConfig and the protobuf ProducerSpec. >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > The updated ProducerSpec in Function.proto will look >>>> like >>>> > >>> this: >>>> > >>> > > > > > > >>>> > >>> > > > > > > message ProducerSpec { >>>> > >>> > > > > > > >>>> > >>> > > > > > > int32 maxPendingMessages = 1; >>>> > >>> > > > > > > >>>> > >>> > > > > > > int32 maxPendingMessagesAcrossPartitions = 2; >>>> > >>> > > > > > > >>>> > >>> > > > > > > bool useThreadLocalProducers = 3; >>>> > >>> > > > > > > >>>> > >>> > > > > > > CryptoSpec cryptoSpec = 4; >>>> > >>> > > > > > > >>>> > >>> > > > > > > string batchBuilder = 5; >>>> > >>> > > > > > > >>>> > >>> > > > > > > bool batchingDisabled = 6; >>>> > >>> > > > > > > >>>> > >>> > > > > > > bool chunkingEnabled = 7; >>>> > >>> > > > > > > >>>> > >>> > > > > > > bool blockIfQueueFullDisabled = 8; >>>> > >>> > > > > > > >>>> > >>> > > > > > > CompressionType compressionType = 9; >>>> > >>> > > > > > > >>>> > >>> > > > > > > HashingScheme hashingScheme = 10; >>>> > >>> > > > > > > >>>> > >>> > > > > > > MessageRoutingMode messageRoutingMode = 11; >>>> > >>> > > > > > > >>>> > >>> > > > > > > int64 batchingMaxPublishDelay = 12; >>>> > >>> > > > > > > >>>> > >>> > > > > > > } >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > To support these new parameters during local run and >>>> for >>>> > >>> > > convenience, >>>> > >>> > > > we >>>> > >>> > > > > > > should be able to specify them in the Admin CLI and in >>>> > local >>>> > >>> run >>>> > >>> > > > modes. >>>> > >>> > > > > > > >>>> > >>> > > > > > > For local run, as function_worker.yml is not used, >>>> these >>>> > >>> parameters >>>> > >>> > > > will >>>> > >>> > > > > > > have defaults and be primitive values: >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Parameter(names = >>>> "--clusterFunctionBatchingDisabled", >>>> > >>> description >>>> > >>> > > = >>>> > >>> > > > > > > "Disable >>>> > >>> > > > > > > the default message batching behavior for functions", >>>> > hidden >>>> > >>> = true) >>>> > >>> > > > > > > >>>> > >>> > > > > > > public boolean clusterFunctionBatchingDisabled = >>>> false; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Parameter(names = "--clusterFunctionChunkingEnabled", >>>> > >>> description = >>>> > >>> > > > > > "The >>>> > >>> > > > > > > default message chunking behavior for functions", >>>> hidden = >>>> > >>> true) >>>> > >>> > > > > > > >>>> > >>> > > > > > > public boolean clusterFunctionChunkingEnabled = false; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Parameter(names = >>>> > >>> "--clusterFunctionBlockIfQueueFullDisabled", >>>> > >>> > > > > > description >>>> > >>> > > > > > > = "Disable the default blocking behavior for >>>> functions when >>>> > >>> queue is >>>> > >>> > > > > > > full", hidden >>>> > >>> > > > > > > = true) >>>> > >>> > > > > > > >>>> > >>> > > > > > > public boolean >>>> clusterFunctionBlockIfQueueFullDisabled = >>>> > >>> false; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Parameter(names = >>>> > "--clusterFunctionCompressionTypeDefault", >>>> > >>> > > > > > > description = "The >>>> > >>> > > > > > > default Compression Type for functions", hidden = >>>> true) >>>> > >>> > > > > > > >>>> > >>> > > > > > > public String clusterFunctionCompressionTypeDefault = >>>> > "LZ4"; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Parameter(names = >>>> "--clusterFunctionHashingSchemeDefault", >>>> > >>> > > > description >>>> > >>> > > > > > = >>>> > >>> > > > > > > "The >>>> > >>> > > > > > > default Hashing Scheme for functions", hidden = true) >>>> > >>> > > > > > > >>>> > >>> > > > > > > public String clusterFunctionHashingSchemeDefault = >>>> > >>> > > "Murmur3_32Hash"; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Parameter(names = >>>> > >>> "--clusterFunctionMessageRoutingModeDefault", >>>> > >>> > > > > > > description >>>> > >>> > > > > > > = "The default Message Routing Mode for functions", >>>> hidden >>>> > = >>>> > >>> true) >>>> > >>> > > > > > > >>>> > >>> > > > > > > public String >>>> clusterFunctionMessageRoutingModeDefault = >>>> > >>> > > > > > "CustomPartition"; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > @Parameter(names = >>>> > >>> > > "--clusterFunctionBatchingMaxPublishDelayDefault", >>>> > >>> > > > > > > description >>>> > >>> > > > > > > = "The default max publish delay (in milliseconds) for >>>> > >>> functions >>>> > >>> > > when >>>> > >>> > > > > > > message batching is enabled", hidden = true) >>>> > >>> > > > > > > >>>> > >>> > > > > > > public long >>>> clusterFunctionBatchingMaxPublishDelayDefault = >>>> > >>> 10L; >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > For Admin CLI, parameters can be provided in a >>>> similar way, >>>> > >>> but they >>>> > >>> > > > > > will >>>> > >>> > > > > > > not have defaults (i.e. Boolean instead of boolean) >>>> since >>>> > >>> parameters >>>> > >>> > > > > > should >>>> > >>> > > > > > > be null to ensure non-provided configs are obtained >>>> from >>>> > >>> > > WorkerConfig. >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > To handle the logic of selecting the property from an >>>> > >>> incoming >>>> > >>> > > > > > > producerConfig vs the producerDefaults object (from >>>> > >>> WorkerConfig), >>>> > >>> > > we >>>> > >>> > > > > > can >>>> > >>> > > > > > > use a FunctionDefaultsMediator to handle this. That >>>> > >>> interface looks >>>> > >>> > > > like >>>> > >>> > > > > > > this and gives us flexibility to handle mediation for >>>> > >>> specific >>>> > >>> > > > > > properties >>>> > >>> > > > > > > without needing to convert the entire object. >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > public interface FunctionDefaultsMediator { >>>> > >>> > > > > > > >>>> > >>> > > > > > > boolean isBatchingDisabled(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > boolean isChunkingEnabled(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > boolean isBlockIfQueueFullDisabled(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > CompressionType getCompressionType(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > Function.CompressionType getCompressionTypeProto(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > HashingScheme getHashingScheme(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > Function.HashingScheme getHashingSchemeProto(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > MessageRoutingMode getMessageRoutingMode(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > Function.MessageRoutingMode >>>> getMessageRoutingModeProto(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > Long getBatchingMaxPublishDelay(); >>>> > >>> > > > > > > >>>> > >>> > > > > > > } >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > (Alternatively, we could remove the Proto methods >>>> from the >>>> > >>> mediator >>>> > >>> > > > and >>>> > >>> > > > > > > just use FunctionConfigUtils.convert(..) to translate >>>> > >>> between the >>>> > >>> > > > > > protobuf >>>> > >>> > > > > > > ProducerSpec and the Java ProducerConfig.) >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > *Testing* >>>> > >>> > > > > > > >>>> > >>> > > > > > > In terms of testing, many of this functionality will >>>> be >>>> > >>> covered by >>>> > >>> > > > > > existing >>>> > >>> > > > > > > tests in the pulsar-functions module. However, we >>>> need to >>>> > >>> test each >>>> > >>> > > of >>>> > >>> > > > > > the >>>> > >>> > > > > > > new classes, as well as these behaviors: >>>> > >>> > > > > > > >>>> > >>> > > > > > > - functionRegistration (with different provided >>>> > >>> ProducerConfig >>>> > >>> > > > > > > properties) >>>> > >>> > > > > > > - functionUpdate (for >>>> ignoreExistingFunctionDefaults=true >>>> > and >>>> > >>> > > > > > > ignoreExistingFunctionDefaults=false) >>>> > >>> > > > > > > - loading function_worker.yml into WorkerConfig >>>> > >>> > > > > > > - throwing exceptions correctly when invalid >>>> parameters are >>>> > >>> > > > provided >>>> > >>> > > > > > > - converting between ProducerSpec and ProducerConfig >>>> > >>> > > > > > > - correctly handling precedence when computing the >>>> final >>>> > >>> > > > > > ProducerConfig >>>> > >>> > > > > > > values >>>> > >>> > > > > > > - creating producers >>>> > >>> > > > > > > - ensuring functionality is consistent across runtimes >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > >>>> > >>> > > > > > > I'd like feedback on this proposal. >>>> > >>> > > > > > > >>>> > >>> > > > > > > Devin G. Bost >>>> > >>> > > > > > > >>>> > >>> > > > > > >>>> > >>> > > > > >>>> > >>> > > > >>>> > >>> > > >>>> > >>> >>>> > >> >>>> > >>>> >>>