All the tests in my personal Github CI passed except for ones in the Flaky
group, so I created an Apache Pulsar PR:
https://github.com/apache/pulsar/pull/10154
(Again, this is for Part 1 of the changes.)
I think that code is ready for review.

Devin G. Bost


On Tue, Apr 6, 2021 at 12:37 AM Devin Bost <devin.b...@gmail.com> wrote:

> I think I was able to resolve that issue. Fortunately, only
> FunctionResultRouter had the singleton behavior, so resolving that was
> straightforward. The current PR for part 1 is here (in my personal CI):
> https://github.com/devinbost/pulsar/pull/5/files
> Once the tests look good, I'll submit a PR to Apache Pulsar.
>
> Devin G. Bost
>
>
> On Mon, Apr 5, 2021 at 8:50 PM Devin Bost <devin.b...@gmail.com> wrote:
>
>> It appears that the reason the router classes are singletons is to ensure
>> the clock behavior is consistent across producers.
>> I wonder if we might be able to avoid creating a breaking change by
>> refactoring out the clock into a singleton that could be shared across the
>> routers, which we could then make not singletons.
>> That would allow the routers the ability to vary behavior through their
>> constructors (thus avoiding a breaking change) and eliminate the race
>> condition that would occur any time two producers are created with the same
>> router class and different batching behavior.
>>
>> Devin G. Bost
>>
>>
>> On Mon, Apr 5, 2021 at 6:01 PM Devin Bost <devin.b...@gmail.com> wrote:
>>
>>> Jerry,
>>>
>>> Thanks for the information and for the links!
>>> Thanks also for the suggestion to split the PR into two parts. That will
>>> make it easier to get some of this work completed in time for the 2.8.0
>>> release.
>>>
>>> While working on the first part, I discovered something that needs
>>> community attention. I discovered that in order to create the producer
>>> (e.g.
>>> https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L117),
>>> we get the MessageRouter from a singleton that has isBatchingEnabled in its
>>> constructor.
>>> [image: image.png]
>>> The problem is that FunctionResultRouter is a shared instance, so if one
>>> function changes the router's value of isBatchingEnabled, it will create a
>>> race condition with other functions that share the same router. It appears
>>> that the reason the router is a singleton is to ensure that clock behavior
>>> is consistent.
>>>
>>> Unless someone has a better idea, it looks like it might be necessary to
>>> refactor isBatchingEnabled (
>>> https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java#L46)
>>> from its constructor into a method parameter on the router.
>>>
>>> However, moving isBatchingEnabled to a method parameter is a breaking
>>> change on the client interface:
>>> https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageRouter.java#L56
>>>
>>> I don't want to introduce breaking changes, but I'm not sure how else to
>>> ensure that the message router behaves correctly across function instances (
>>> https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java#L82)
>>> when some functions may have batching enabled and some may not.
>>>
>>> Looking for ideas/feedback.
>>>
>>> Devin G. Bost
>>>
>>>
>>> On Mon, Apr 5, 2021 at 1:11 PM Jerry Peng <jerry.boyang.p...@gmail.com>
>>> wrote:
>>>
>>>> >
>>>> >  The interceptor would be loaded at function registration.
>>>>
>>>>
>>>> The interceptor classes should be loaded at broker/worker startup time.
>>>>
>>>> In order for an
>>>> > interceptor to be picked up, a jar would need to be compiled with the
>>>> class
>>>> > that implements the interface, and the jar would need to be deployed
>>>> into a
>>>> > directory that matches the directory provided somewhere. (Would this
>>>> > directory be specified in function_worker.yml? Or, would we need to
>>>> reuse
>>>> > one of the existing interceptor directories and filter classes at
>>>> > runtime?)
>>>> >
>>>>
>>>> The JAR with classes for the interceptor can just be placed in the lib/
>>>> directory of the pulsar project.  An additional config may be needed to
>>>> be
>>>> added to function_worker.yml so the classname of the interceptor impl
>>>> to be
>>>> provided.
>>>>
>>>> In order for changes to the FunctionConfig values to reach where the
>>>> > Function producers are created, we'd still need to add the producer
>>>> > properties to the Function.proto and FunctionConfig.ProducerConfig
>>>> > definitions. We'd also still need to modify CLI parameters to ensure
>>>> these
>>>> > values could be modified on a per-function basis.
>>>> >
>>>>
>>>> For you use case of disabling batching, you will still need to add an
>>>> additional field in the Function.proto so that it can be configurable.
>>>> I
>>>> would recommend you doing that work separately since that work is
>>>> orthogonal to interceptors.
>>>>
>>>> When a function is registered, instead of retrieving cluster-wide
>>>> function
>>>> > producer defaults from WorkerConfig, it would load the interceptor,
>>>> which
>>>> > would modify the FunctionConfig's ProducerConfig's property values to
>>>> > provide the cluster-wide defaults for any value that wasn't provided
>>>> by the
>>>> > REST call.
>>>> >
>>>>
>>>> Yes.
>>>>
>>>> If an interceptor wasn't provided, during conversion of the
>>>> ProducerConfig
>>>> > to the protobuf ProducerSpec, the protobuf defaults will take effect
>>>> (since
>>>> > protobuf doesn't allow null primitives), which will then be picked up
>>>> when
>>>> > the producer is created.
>>>> >
>>>>
>>>>
>>>> If an interceptor impl is not provided, the behavior of the APIs should
>>>> be
>>>> the same as the current behavior.
>>>>
>>>>
>>>> Back in the Streamlio, I actually created an interceptor mechanism.
>>>>
>>>> Please take at this old PR:
>>>>
>>>> https://github.com/apache/pulsar/pull/7159
>>>>
>>>> or more specifically what is done for functions here.
>>>>
>>>>
>>>> https://github.com/apache/pulsar/pull/7159/files#diff-427711b7c1c3b9f17cd1c02bb9ceb942af454676dd97902a43e910f645febc6d
>>>>
>>>>
>>>> Perhaps you can help revive this work!
>>>>
>>>> Best,
>>>>
>>>> Jerry
>>>>
>>>> On Mon, Apr 5, 2021 at 11:46 AM Devin Bost <devin.b...@gmail.com>
>>>> wrote:
>>>>
>>>> > Here's my understanding of what I'd need to do to implement the
>>>> interceptor
>>>> > approach.
>>>> > The interceptor would be loaded at function registration. In order
>>>> for an
>>>> > interceptor to be picked up, a jar would need to be compiled with the
>>>> class
>>>> > that implements the interface, and the jar would need to be deployed
>>>> into a
>>>> > directory that matches the directory provided somewhere. (Would this
>>>> > directory be specified in function_worker.yml? Or, would we need to
>>>> reuse
>>>> > one of the existing interceptor directories and filter classes at
>>>> > runtime?)
>>>> > In order for changes to the FunctionConfig values to reach where the
>>>> > Function producers are created, we'd still need to add the producer
>>>> > properties to the Function.proto and FunctionConfig.ProducerConfig
>>>> > definitions. We'd also still need to modify CLI parameters to ensure
>>>> these
>>>> > values could be modified on a per-function basis.
>>>> > When a function is registered, instead of retrieving cluster-wide
>>>> function
>>>> > producer defaults from WorkerConfig, it would load the interceptor,
>>>> which
>>>> > would modify the FunctionConfig's ProducerConfig's property values to
>>>> > provide the cluster-wide defaults for any value that wasn't provided
>>>> by the
>>>> > REST call.
>>>> > If an interceptor wasn't provided, during conversion of the
>>>> ProducerConfig
>>>> > to the protobuf ProducerSpec, the protobuf defaults will take effect
>>>> (since
>>>> > protobuf doesn't allow null primitives), which will then be picked up
>>>> when
>>>> > the producer is created.
>>>> >
>>>> > Is that right?
>>>> >
>>>> > Devin G. Bost
>>>> >
>>>> >
>>>> > On Mon, Apr 5, 2021 at 11:56 AM Devin Bost <devin.b...@gmail.com>
>>>> wrote:
>>>> >
>>>> > > If we use the interceptor approach, does that mean a cluster-admin
>>>> would
>>>> > > need to write custom code in order to override the function producer
>>>> > > settings? I worry about adding additional burden to cluster
>>>> > administration.
>>>> > >
>>>> > > --
>>>> > > Devin G. Bost
>>>> > >
>>>> > > On Sat, Apr 3, 2021, 8:26 AM Devin Bost <devin.b...@gmail.com>
>>>> wrote:
>>>> > >
>>>> > >> Hi Rui,
>>>> > >>
>>>> > >> Thanks for the feedback. My understanding about the interceptor is
>>>> that
>>>> > >> it's not something that would be touched by users but would only be
>>>> > >> modified by cluster admins. So, I think we'd still need to include
>>>> the
>>>> > >> parameters the parameters on the ProducerConfig.
>>>> > >> I already have the other approach coded, so changing the design
>>>> will
>>>> > >> require some rework.
>>>> > >> I'll look at that example you mentioned.
>>>> > >>
>>>> > >> --
>>>> > >> Devin G. Bost
>>>> > >>
>>>> > >> On Fri, Apr 2, 2021, 10:18 PM Rui Fu <f...@rui.sh> wrote:
>>>> > >>
>>>> > >>> Hi Devin,
>>>> > >>>
>>>> > >>> Great proposal for giving function such useful feature, and
>>>> thanks for
>>>> > >>> your detailed writing. After going through the history of this
>>>> > discussion,
>>>> > >>> I would like to have +1 with Jerry’s suggestion. With the
>>>> interceptor,
>>>> > the
>>>> > >>> admins and users may have the maximum flexibility to customize
>>>> their
>>>> > >>> producer configs. It may also reduce the parameters we put into
>>>> the
>>>> > configs
>>>> > >>> and pulsar-admin. Also, we have some interceptors with Pulsar
>>>> Functions
>>>> > >>> already, like the `RuntimeCustomizer` interface, so it is not a
>>>> new
>>>> > tech
>>>> > >>> that users hard to adopt.
>>>> > >>>
>>>> > >>> Besides, I would like to suggest that to cover these changes with
>>>> > Python
>>>> > >>> & Go runtime as well if we will have some new fields to
>>>> > `Function.proto`.
>>>> > >>>
>>>> > >>> Best,
>>>> > >>>
>>>> > >>> Rui
>>>> > >>> 在 2021年4月2日 +0800 AM10:13,Devin Bost <devin.b...@gmail.com>,写道:
>>>> > >>> > What would be some of the additional benefits of using the
>>>> > interceptor
>>>> > >>> > approach?
>>>> > >>> >
>>>> > >>> > --
>>>> > >>> > Devin G. Bost
>>>> > >>> >
>>>> > >>> > On Thu, Apr 1, 2021, 6:03 PM Jerry Peng <
>>>> jerry.boyang.p...@gmail.com
>>>> > >
>>>> > >>> wrote:
>>>> > >>> >
>>>> > >>> > > Devin,
>>>> > >>> > >
>>>> > >>> > > Interceptors are located on the server side (broker/worker).
>>>> > Cluster
>>>> > >>> > > admins would create the plugins based on the interfaces I
>>>> described
>>>> > >>> and
>>>> > >>> > > install them on the server side. Regular function developers
>>>> will
>>>> > not
>>>> > >>> > > implement or interact directly with the interceptor.
>>>> > >>> > >
>>>> > >>> > > > Would it be better to just ignore WorkerConfig defaults
>>>> during
>>>> > >>> function
>>>> > >>> > > update?
>>>> > >>> > >
>>>> > >>> > > Yes.
>>>> > >>> > >
>>>> > >>> > > If the cluster admin changes the function config defaults and
>>>> wants
>>>> > >>> > > existing functions to utilize those configs that have
>>>> changed, the
>>>> > >>> admin
>>>> > >>> > > can just update those configs of the functions through the
>>>> regular
>>>> > >>> update
>>>> > >>> > > mechanism for functions we have today.
>>>> > >>> > >
>>>> > >>> > >
>>>> > >>> > >
>>>> > >>> > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <
>>>> devin.b...@gmail.com>
>>>> > >>> wrote:
>>>> > >>> > >
>>>> > >>> > > > Would it be better to just ignore WorkerConfig defaults
>>>> during
>>>> > >>> function
>>>> > >>> > > > update? We could still update the function producer behavior
>>>> > >>> through the
>>>> > >>> > > > producerConfig passed as a REST parameter, but we'd avoid
>>>> the
>>>> > edge
>>>> > >>> case I
>>>> > >>> > > > mentioned.
>>>> > >>> > > >
>>>> > >>> > > > Devin G. Bost
>>>> > >>> > > >
>>>> > >>> > > >
>>>> > >>> > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <
>>>> devin.b...@gmail.com>
>>>> > >>> wrote:
>>>> > >>> > > >
>>>> > >>> > > > > I just remembered an edge case that motivated me to
>>>> create that
>>>> > >>> > > > additional
>>>> > >>> > > > > flow.
>>>> > >>> > > > > In my PR, if any producerConfig values are null at
>>>> registration
>>>> > >>> or
>>>> > >>> > > > update,
>>>> > >>> > > > > we get those values from WorkerConfig instead.
>>>> > >>> > > > > However, when a user runs an update (on a function) for
>>>> the
>>>> > >>> first time
>>>> > >>> > > > > after upgrading to the version of Pulsar with this
>>>> feature, if
>>>> > >>> someone
>>>> > >>> > > > has
>>>> > >>> > > > > modified the function_worker.yml file (perhaps without
>>>> their
>>>> > >>> > > knowledge),
>>>> > >>> > > > > those defaults from function_worker.yml will get picked up
>>>> > >>> because the
>>>> > >>> > > > > existing producerConfig will have null set for those
>>>> values
>>>> > >>> (because
>>>> > >>> > > the
>>>> > >>> > > > > function was registered prior to the existence of this
>>>> > feature.)
>>>> > >>> So,
>>>> > >>> > > > > the user could be trying to update something unrelated,
>>>> not
>>>> > >>> realizing
>>>> > >>> > > > that
>>>> > >>> > > > > their function is going to have its producerConfig
>>>> updated by
>>>> > the
>>>> > >>> > > cluster
>>>> > >>> > > > > defaults. Most users won't be affected by this, but it
>>>> could be
>>>> > >>> tricky
>>>> > >>> > > to
>>>> > >>> > > > > diagnose for the few who would get impacted by it. (Is
>>>> this
>>>> > >>> description
>>>> > >>> > > > any
>>>> > >>> > > > > clearer? If not, it might be clearer if I diagram it.)
>>>> > >>> > > > >
>>>> > >>> > > > > Devin G. Bost
>>>> > >>> > > > >
>>>> > >>> > > > >
>>>> > >>> > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng <
>>>> > >>> jerry.boyang.p...@gmail.com
>>>> > >>> > > >
>>>> > >>> > > > > wrote:
>>>> > >>> > > > >
>>>> > >>> > > > > > Hi Devin,
>>>> > >>> > > > > >
>>>> > >>> > > > > > I understand the usefulness of giving cluster admins the
>>>> > >>> ability to
>>>> > >>> > > > > > specify
>>>> > >>> > > > > > some defaults for functions at a cluster level. I think
>>>> the
>>>> > >>> right
>>>> > >>> > > time
>>>> > >>> > > > to
>>>> > >>> > > > > > apply those defaults is during function registration
>>>> time. I
>>>> > >>> am not
>>>> > >>> > > > sure
>>>> > >>> > > > > > I
>>>> > >>> > > > > > understand the usefulness of what you are proposing we
>>>> do
>>>> > >>> during
>>>> > >>> > > update
>>>> > >>> > > > > > time. If you would like to change a config of an already
>>>> > >>> running
>>>> > >>> > > > function
>>>> > >>> > > > > > why not just update it with config changes that are
>>>> needed. I
>>>> > >>> am not
>>>> > >>> > > > sure
>>>> > >>> > > > > > you need the additional workflow you proposed for the
>>>> update.
>>>> > >>> > > > > >
>>>> > >>> > > > > > Another idea to satisfy your use case is that instead of
>>>> > >>> leveraging a
>>>> > >>> > > > > > default config mechanism perhaps we should take a look
>>>> at
>>>> > >>> implementing
>>>> > >>> > > > > > interceptors for the function/sources/sink API calls. We
>>>> > >>> already have
>>>> > >>> > > > > > interceptors implemented for many other pulsar
>>>> > functionalities.
>>>> > >>> > > Perhaps
>>>> > >>> > > > we
>>>> > >>> > > > > > should do it for Pulsar Functions APIs as well. With
>>>> > >>> interceptors you
>>>> > >>> > > > > > will
>>>> > >>> > > > > > be able to intercept all API operations to functions and
>>>> > >>> customize the
>>>> > >>> > > > > > requests however you would like.
>>>> > >>> > > > > >
>>>> > >>> > > > > > We can have a interface like:
>>>> > >>> > > > > >
>>>> > >>> > > > > > interface FunctionInterceptors {
>>>> > >>> > > > > >
>>>> > >>> > > > > > void regsterFunctionInterceptor (String tenant, String
>>>> > >>> namespace,
>>>> > >>> > > String
>>>> > >>> > > > > > functionName, FunctionConfig functionConfig...);
>>>> > >>> > > > > >
>>>> > >>> > > > > > ...
>>>> > >>> > > > > >
>>>> > >>> > > > > > }
>>>> > >>> > > > > >
>>>> > >>> > > > > > for developers to implement. During the beginning of
>>>> every
>>>> > >>> Function
>>>> > >>> > > API
>>>> > >>> > > > > > call the corresponding interceptor method gets called.
>>>> For
>>>> > >>> example,
>>>> > >>> > > the
>>>> > >>> > > > > > "regsterFunctionInterceptor()" method I provided above
>>>> will
>>>> > be
>>>> > >>> called
>>>> > >>> > > > > > towards the beginning of the registerFunction and allow
>>>> you
>>>> > to
>>>> > >>> > > customize
>>>> > >>> > > > > > the function config however you want.
>>>> > >>> > > > > >
>>>> > >>> > > > > > Best,
>>>> > >>> > > > > >
>>>> > >>> > > > > > Jerry
>>>> > >>> > > > > >
>>>> > >>> > > > > >
>>>> > >>> > > > > >
>>>> > >>> > > > > > On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <
>>>> > >>> devin.b...@gmail.com>
>>>> > >>> > > wrote:
>>>> > >>> > > > > >
>>>> > >>> > > > > > > *Cluster-Wide and Function-Specific Producer Defaults*
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > * - Status: Proposal- Author: Devin Bost (with
>>>> guidance
>>>> > from
>>>> > >>> Jerry
>>>> > >>> > > > > > Peng)-
>>>> > >>> > > > > > > Pull Request:
>>>> https://github.com/apache/pulsar/pull/9987
>>>> > >>> > > > > > > <https://github.com/apache/pulsar/pull/9987>-
>>>> Mailing List
>>>> > >>> > > > discussion:
>>>> > >>> > > > > > -
>>>> > >>> > > > > > > Release: 2.8.0*
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > *Motivation*
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > Pulsar currently provides no way to allow users or
>>>> > operators
>>>> > >>> to
>>>> > >>> > > change
>>>> > >>> > > > > > the
>>>> > >>> > > > > > > producer behavior of Pulsar functions. For
>>>> organizations
>>>> > >>> that are
>>>> > >>> > > > making
>>>> > >>> > > > > > > heavy use of functions, the ability to tune messaging
>>>> > >>> behavior of
>>>> > >>> > > > > > functions
>>>> > >>> > > > > > > is greatly desired. Moreover, current function
>>>> messaging
>>>> > >>> defaults
>>>> > >>> > > > > > appear to
>>>> > >>> > > > > > > be causing issues for certain workloads. (For
>>>> example, some
>>>> > >>> bugs
>>>> > >>> > > > appear
>>>> > >>> > > > > > to
>>>> > >>> > > > > > > be related to having batching enabled. See #6054.)
>>>> Enabling
>>>> > >>> > > operators
>>>> > >>> > > > to
>>>> > >>> > > > > > > modify these settings will help Pulsar cluster admins
>>>> > >>> workaround
>>>> > >>> > > > issues
>>>> > >>> > > > > > to
>>>> > >>> > > > > > > enable them to upgrade legacy versions of Pulsar to
>>>> more
>>>> > >>> recent
>>>> > >>> > > > > > versions.
>>>> > >>> > > > > > > Moreover, enabling greater tuning of these settings
>>>> can
>>>> > >>> provide more
>>>> > >>> > > > > > > opportunity for performance optimization in production
>>>> > >>> environments.
>>>> > >>> > > > We
>>>> > >>> > > > > > > need the ability to modify these function producer
>>>> > settings:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > - batchingEnabled
>>>> > >>> > > > > > > - chunkingEnabled
>>>> > >>> > > > > > > - blockIfQueueFull
>>>> > >>> > > > > > > - compressionType
>>>> > >>> > > > > > > - hashingScheme
>>>> > >>> > > > > > > - messageRoutingMode
>>>> > >>> > > > > > > - batchingMaxPublishDelay
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > *Implementation Strategy*
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > Ideally, we need a way to set default message producer
>>>> > >>> behavior (for
>>>> > >>> > > > > > > functions) on a cluster-wide level. However,
>>>> operators may
>>>> > >>> want
>>>> > >>> > > > specific
>>>> > >>> > > > > > > settings to be used for certain functions. (We can't
>>>> assume
>>>> > >>> that
>>>> > >>> > > > > > everyone
>>>> > >>> > > > > > > will want all functions in a cluster to have the same
>>>> > >>> producer
>>>> > >>> > > > > > settings.)
>>>> > >>> > > > > > > When an operator changes the cluster-wide producer
>>>> > defaults,
>>>> > >>> they
>>>> > >>> > > > need a
>>>> > >>> > > > > > > way to be able to roll out the changes to functions
>>>> without
>>>> > >>> > > > > > significantly
>>>> > >>> > > > > > > disrupting messaging flows in production
>>>> environments. They
>>>> > >>> also
>>>> > >>> > > need
>>>> > >>> > > > a
>>>> > >>> > > > > > > simple way to rollback changes in case a change to
>>>> function
>>>> > >>> producer
>>>> > >>> > > > > > > settings results in undesired behavior. However, not
>>>> all
>>>> > >>> users will
>>>> > >>> > > > want
>>>> > >>> > > > > > > function updates to replace existing function producer
>>>> > >>> settings,
>>>> > >>> > > > > > especially
>>>> > >>> > > > > > > for functions that have been given custom producer
>>>> settings
>>>> > >>> (at the
>>>> > >>> > > > > > > per-function level.) Due to this difference in use
>>>> cases,
>>>> > >>> there
>>>> > >>> > > needs
>>>> > >>> > > > > > to be
>>>> > >>> > > > > > > a way to allow an operator to specify what update
>>>> behavior
>>>> > >>> they
>>>> > >>> > > want.
>>>> > >>> > > > > > For a
>>>> > >>> > > > > > > given update, the user should be able to specify if:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > 1. Existing function producer settings will be
>>>> replaced by
>>>> > >>> > > > > > cluster-wide
>>>> > >>> > > > > > > defaults (unless overridden by producer settings
>>>> provided
>>>> > in
>>>> > >>> the
>>>> > >>> > > > > > > request),
>>>> > >>> > > > > > > or
>>>> > >>> > > > > > > 2. Existing function producer settings will *not* be
>>>> > >>> replaced by
>>>> > >>> > > > > > > cluster-wide defaults (unless overridden by producer
>>>> > settings
>>>> > >>> > > > > > provided
>>>> > >>> > > > > > > in
>>>> > >>> > > > > > > the request).
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > So, the two orders of precedence are:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > 1. newProducerConfig > cluster-wide defaults >
>>>> > >>> > > > existingProducerConfig
>>>> > >>> > > > > > > 2. newProducerConfig > existingProducerConfig >
>>>> > cluster-wide
>>>> > >>> > > > defaults
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > We can add a boolean property to UpdateOptions to
>>>> allow the
>>>> > >>> > > precedence
>>>> > >>> > > > > > to
>>>> > >>> > > > > > > be specified. Since we don't want to introduce
>>>> unexpected
>>>> > >>> side
>>>> > >>> > > effects
>>>> > >>> > > > > > > during an update, the default precedence should
>>>> prefer the
>>>> > >>> > > > > > > existingProducerConfig over cluster-wide defaults.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > To ensure that the desired producer settings (after
>>>> > >>> calculating the
>>>> > >>> > > > > > > settings from defaults + overrides) are available when
>>>> > >>> creating the
>>>> > >>> > > > > > > producers (on any cluster), we need to add these
>>>> producer
>>>> > >>> settings
>>>> > >>> > > to
>>>> > >>> > > > > > the
>>>> > >>> > > > > > > function protobuf definition. Also, to ensure that
>>>> settings
>>>> > >>> can be
>>>> > >>> > > > > > properly
>>>> > >>> > > > > > > validated at the time they are submitted, we need
>>>> > validation
>>>> > >>> in two
>>>> > >>> > > > > > places:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > 1. When cluster-wide configs are loaded upon broker
>>>> startup
>>>> > >>> > > > > > > 2. When a function is registered or updated.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > As it would be impractical to check every registered
>>>> > function
>>>> > >>> > > > definition
>>>> > >>> > > > > > > during broker startup, most of the validation needs to
>>>> > occur
>>>> > >>> during
>>>> > >>> > > > > > > function registration or update. However, during
>>>> broker
>>>> > >>> startup, we
>>>> > >>> > > > can
>>>> > >>> > > > > > at
>>>> > >>> > > > > > > least validate that the configurations exist and
>>>> throw an
>>>> > >>> exception
>>>> > >>> > > if
>>>> > >>> > > > > > they
>>>> > >>> > > > > > > are invalid.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > As any configuration exceptions thrown when the
>>>> producer is
>>>> > >>> created
>>>> > >>> > > > > > would
>>>> > >>> > > > > > > bubble up to the client, we need to ensure that the
>>>> final
>>>> > >>> > > > configurations
>>>> > >>> > > > > > > are valid when a function is registered or updated to
>>>> > ensure
>>>> > >>> there
>>>> > >>> > > are
>>>> > >>> > > > > > > exceptions when the producer is created. This requires
>>>> > >>> computing the
>>>> > >>> > > > > > > producer settings (according to the desired
>>>> precedence)
>>>> > >>> during
>>>> > >>> > > > function
>>>> > >>> > > > > > > registration or update.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > When a function is registered, existing producer
>>>> settings
>>>> > do
>>>> > >>> not
>>>> > >>> > > exist
>>>> > >>> > > > > > for
>>>> > >>> > > > > > > it, so we simply need to get cluster-wide defaults and
>>>> > >>> override them
>>>> > >>> > > > > > with
>>>> > >>> > > > > > > any specific producer settings provided in the
>>>> > >>> FunctionConfig's
>>>> > >>> > > > > > > ProducerConfig (submitted as a REST parameter.) After
>>>> the
>>>> > >>> initial
>>>> > >>> > > > > > > registration, those computed settings will then be
>>>> > persisted
>>>> > >>> in
>>>> > >>> > > > > > bookkeeper
>>>> > >>> > > > > > > with the function definition.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > When a function is updated, we also need to address
>>>> the
>>>> > >>> existing
>>>> > >>> > > > > > > producerConfig, and for this, we need to check the
>>>> desired
>>>> > >>> override
>>>> > >>> > > > > > > precedence (as explained above) when computing the
>>>> final
>>>> > >>> producer
>>>> > >>> > > > > > settings
>>>> > >>> > > > > > > that will be persisted with the function.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > *Modifications*
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > As WorkerConfig is readily available when functions
>>>> are
>>>> > >>> registered
>>>> > >>> > > and
>>>> > >>> > > > > > > updated, we can put new settings into
>>>> function_worker.yml
>>>> > >>> and can
>>>> > >>> > > load
>>>> > >>> > > > > > and
>>>> > >>> > > > > > > validate them via WorkerConfig.load(..). To prevent
>>>> > >>> introducing
>>>> > >>> > > > breaking
>>>> > >>> > > > > > > changes, parameters will have defaults assigned in
>>>> > >>> WorkerConfig that
>>>> > >>> > > > > > > conform to current expected behavior, like this:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Data
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Accessors(chain = true)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public class FunctionDefaultsConfig implements
>>>> > Serializable {
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @FieldContext(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > doc = "Disables default message batching between
>>>> > >>> > > > functions"
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > )
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > protected boolean batchingDisabled = false;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @FieldContext(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > doc = "Enables default message chunking between
>>>> > >>> > > functions"
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > )
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > protected boolean chunkingEnabled = false;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @FieldContext(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > doc = "Disables default behavior to block when message
>>>> > >>> > > > > > queue is
>>>> > >>> > > > > > > full"
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > )
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > protected boolean blockIfQueueFullDisabled = false;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @FieldContext(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > doc = "Default compression type"
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > )
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > protected String compressionType = "LZ4";
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @FieldContext(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > doc = "Default hashing scheme"
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > )
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > protected String hashingScheme = "Murmur3_32Hash";
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @FieldContext(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > doc = "Default hashing scheme"
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > )
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > protected String messageRoutingMode =
>>>> "CustomPartition";
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @FieldContext(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > doc = "Default max publish delay (in milliseconds)
>>>> when
>>>> > >>> > > > > > message
>>>> > >>> > > > > > > batching is enabled"
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > )
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > protected long batchingMaxPublishDelay = 10L;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public ClusterFunctionProducerDefaults
>>>> > >>> buildProducerDefaults()
>>>> > >>> > > > > > > throws InvalidWorkerConfigDefaultException
>>>> > >>> > > > > > > {
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > return new
>>>> > >>> > > > > > >
>>>> ClusterFunctionProducerDefaults(this.isBatchingDisabled()
>>>> > >>> > > > > > > ,
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > this.isChunkingEnabled(),
>>>> > >>> > > > > > > this.isBlockIfQueueFullDisabled(),
>>>> > >>> > > > > > > this.getCompressionType(),
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > this.getHashingScheme(),
>>>> > >>> > > this.getMessageRoutingMode(),
>>>> > >>> > > > > > this
>>>> > >>> > > > > > > .getBatchingMaxPublishDelay());
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > }
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public FunctionDefaultsConfig() {
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > }
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > }
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > The additional section in function_worker.yml will
>>>> look
>>>> > like
>>>> > >>> this:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > functionDefaults:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > batchingDisabled: true
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > chunkingEnabled: false
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > blockIfQueueFullDisabled: false
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > batchingMaxPublishDelay: 12
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > compressionType: ZLIB
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > hashingScheme: JavaStringHash
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > messageRoutingMode: RoundRobinPartition
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > During function update, we can use a new boolean
>>>> option,
>>>> > >>> > > > > > > ignoreExistingFunctionDefaults, to toggle precedence:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Data
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @NoArgsConstructor
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @ApiModel(value = "UpdateOptions", description =
>>>> "Options
>>>> > >>> while
>>>> > >>> > > > updating
>>>> > >>> > > > > > > function defaults or the sink")
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public class UpdateOptions {
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @ApiModelProperty(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > value = "Whether or not to update the auth data",
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > name = "update-auth-data")
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > private boolean updateAuthData = false;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @ApiModelProperty(
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > value="Whether or not to ignore any existing function
>>>> > >>> > > > > > > defaults",
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > name ="ignore-existing-function-defaults")
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > private boolean ignoreExistingFunctionDefaults =
>>>> false;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > }
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > To ensure that function producer settings don't get
>>>> > modified
>>>> > >>> > > > > > unexpectedly
>>>> > >>> > > > > > > by an update, ignoreExistingFunctionDefaults = false
>>>> by
>>>> > >>> default.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > The updated ProducerConfig will contain the new
>>>> properties
>>>> > >>> and a
>>>> > >>> > > > method
>>>> > >>> > > > > > to
>>>> > >>> > > > > > > merge an incoming ProducerConfig with an existing
>>>> > >>> ProducerConfig.
>>>> > >>> > > (The
>>>> > >>> > > > > > > merged ProducerConfig will use the incoming
>>>> ProducerConfig
>>>> > >>> > > properties
>>>> > >>> > > > > > when
>>>> > >>> > > > > > > they exist and will use existing ProducerConfig
>>>> properties
>>>> > >>> > > otherwise.)
>>>> > >>> > > > > > To
>>>> > >>> > > > > > > ensure we don't need to force operators to provide
>>>> those
>>>> > >>> > > > ProducerConfig
>>>> > >>> > > > > > > properties with every create or update, they must be
>>>> > >>> nullable, not
>>>> > >>> > > > > > > primitive (e.g. Boolean instead of boolean.)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > To support the additional properties,
>>>> FunctionConfigUtils
>>>> > >>> (and any
>>>> > >>> > > > > > similar
>>>> > >>> > > > > > > classes) will need to be modified to correctly
>>>> translate
>>>> > >>> between the
>>>> > >>> > > > > > > ProducerConfig and the protobuf ProducerSpec.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > The updated ProducerSpec in Function.proto will look
>>>> like
>>>> > >>> this:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > message ProducerSpec {
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > int32 maxPendingMessages = 1;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > int32 maxPendingMessagesAcrossPartitions = 2;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > bool useThreadLocalProducers = 3;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > CryptoSpec cryptoSpec = 4;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > string batchBuilder = 5;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > bool batchingDisabled = 6;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > bool chunkingEnabled = 7;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > bool blockIfQueueFullDisabled = 8;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > CompressionType compressionType = 9;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > HashingScheme hashingScheme = 10;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > MessageRoutingMode messageRoutingMode = 11;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > int64 batchingMaxPublishDelay = 12;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > }
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > To support these new parameters during local run and
>>>> for
>>>> > >>> > > convenience,
>>>> > >>> > > > we
>>>> > >>> > > > > > > should be able to specify them in the Admin CLI and in
>>>> > local
>>>> > >>> run
>>>> > >>> > > > modes.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > For local run, as function_worker.yml is not used,
>>>> these
>>>> > >>> parameters
>>>> > >>> > > > will
>>>> > >>> > > > > > > have defaults and be primitive values:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Parameter(names =
>>>> "--clusterFunctionBatchingDisabled",
>>>> > >>> description
>>>> > >>> > > =
>>>> > >>> > > > > > > "Disable
>>>> > >>> > > > > > > the default message batching behavior for functions",
>>>> > hidden
>>>> > >>> = true)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public boolean clusterFunctionBatchingDisabled =
>>>> false;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Parameter(names = "--clusterFunctionChunkingEnabled",
>>>> > >>> description =
>>>> > >>> > > > > > "The
>>>> > >>> > > > > > > default message chunking behavior for functions",
>>>> hidden =
>>>> > >>> true)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public boolean clusterFunctionChunkingEnabled = false;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Parameter(names =
>>>> > >>> "--clusterFunctionBlockIfQueueFullDisabled",
>>>> > >>> > > > > > description
>>>> > >>> > > > > > > = "Disable the default blocking behavior for
>>>> functions when
>>>> > >>> queue is
>>>> > >>> > > > > > > full", hidden
>>>> > >>> > > > > > > = true)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public boolean
>>>> clusterFunctionBlockIfQueueFullDisabled =
>>>> > >>> false;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Parameter(names =
>>>> > "--clusterFunctionCompressionTypeDefault",
>>>> > >>> > > > > > > description = "The
>>>> > >>> > > > > > > default Compression Type for functions", hidden =
>>>> true)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public String clusterFunctionCompressionTypeDefault =
>>>> > "LZ4";
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Parameter(names =
>>>> "--clusterFunctionHashingSchemeDefault",
>>>> > >>> > > > description
>>>> > >>> > > > > > =
>>>> > >>> > > > > > > "The
>>>> > >>> > > > > > > default Hashing Scheme for functions", hidden = true)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public String clusterFunctionHashingSchemeDefault =
>>>> > >>> > > "Murmur3_32Hash";
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Parameter(names =
>>>> > >>> "--clusterFunctionMessageRoutingModeDefault",
>>>> > >>> > > > > > > description
>>>> > >>> > > > > > > = "The default Message Routing Mode for functions",
>>>> hidden
>>>> > =
>>>> > >>> true)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public String
>>>> clusterFunctionMessageRoutingModeDefault =
>>>> > >>> > > > > > "CustomPartition";
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > @Parameter(names =
>>>> > >>> > > "--clusterFunctionBatchingMaxPublishDelayDefault",
>>>> > >>> > > > > > > description
>>>> > >>> > > > > > > = "The default max publish delay (in milliseconds) for
>>>> > >>> functions
>>>> > >>> > > when
>>>> > >>> > > > > > > message batching is enabled", hidden = true)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public long
>>>> clusterFunctionBatchingMaxPublishDelayDefault =
>>>> > >>> 10L;
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > For Admin CLI, parameters can be provided in a
>>>> similar way,
>>>> > >>> but they
>>>> > >>> > > > > > will
>>>> > >>> > > > > > > not have defaults (i.e. Boolean instead of boolean)
>>>> since
>>>> > >>> parameters
>>>> > >>> > > > > > should
>>>> > >>> > > > > > > be null to ensure non-provided configs are obtained
>>>> from
>>>> > >>> > > WorkerConfig.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > To handle the logic of selecting the property from an
>>>> > >>> incoming
>>>> > >>> > > > > > > producerConfig vs the producerDefaults object (from
>>>> > >>> WorkerConfig),
>>>> > >>> > > we
>>>> > >>> > > > > > can
>>>> > >>> > > > > > > use a FunctionDefaultsMediator to handle this. That
>>>> > >>> interface looks
>>>> > >>> > > > like
>>>> > >>> > > > > > > this and gives us flexibility to handle mediation for
>>>> > >>> specific
>>>> > >>> > > > > > properties
>>>> > >>> > > > > > > without needing to convert the entire object.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > public interface FunctionDefaultsMediator {
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > boolean isBatchingDisabled();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > boolean isChunkingEnabled();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > boolean isBlockIfQueueFullDisabled();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > CompressionType getCompressionType();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > Function.CompressionType getCompressionTypeProto();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > HashingScheme getHashingScheme();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > Function.HashingScheme getHashingSchemeProto();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > MessageRoutingMode getMessageRoutingMode();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > Function.MessageRoutingMode
>>>> getMessageRoutingModeProto();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > Long getBatchingMaxPublishDelay();
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > }
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > (Alternatively, we could remove the Proto methods
>>>> from the
>>>> > >>> mediator
>>>> > >>> > > > and
>>>> > >>> > > > > > > just use FunctionConfigUtils.convert(..) to translate
>>>> > >>> between the
>>>> > >>> > > > > > protobuf
>>>> > >>> > > > > > > ProducerSpec and the Java ProducerConfig.)
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > *Testing*
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > In terms of testing, many of this functionality will
>>>> be
>>>> > >>> covered by
>>>> > >>> > > > > > existing
>>>> > >>> > > > > > > tests in the pulsar-functions module. However, we
>>>> need to
>>>> > >>> test each
>>>> > >>> > > of
>>>> > >>> > > > > > the
>>>> > >>> > > > > > > new classes, as well as these behaviors:
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > - functionRegistration (with different provided
>>>> > >>> ProducerConfig
>>>> > >>> > > > > > > properties)
>>>> > >>> > > > > > > - functionUpdate (for
>>>> ignoreExistingFunctionDefaults=true
>>>> > and
>>>> > >>> > > > > > > ignoreExistingFunctionDefaults=false)
>>>> > >>> > > > > > > - loading function_worker.yml into WorkerConfig
>>>> > >>> > > > > > > - throwing exceptions correctly when invalid
>>>> parameters are
>>>> > >>> > > > provided
>>>> > >>> > > > > > > - converting between ProducerSpec and ProducerConfig
>>>> > >>> > > > > > > - correctly handling precedence when computing the
>>>> final
>>>> > >>> > > > > > ProducerConfig
>>>> > >>> > > > > > > values
>>>> > >>> > > > > > > - creating producers
>>>> > >>> > > > > > > - ensuring functionality is consistent across runtimes
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > I'd like feedback on this proposal.
>>>> > >>> > > > > > >
>>>> > >>> > > > > > > Devin G. Bost
>>>> > >>> > > > > > >
>>>> > >>> > > > > >
>>>> > >>> > > > >
>>>> > >>> > > >
>>>> > >>> > >
>>>> > >>>
>>>> > >>
>>>> >
>>>>
>>>

Reply via email to