If we use the interceptor approach, does that mean a cluster-admin would need to write custom code in order to override the function producer settings? I worry about adding additional burden to cluster administration.
-- Devin G. Bost On Sat, Apr 3, 2021, 8:26 AM Devin Bost <devin.b...@gmail.com> wrote: > Hi Rui, > > Thanks for the feedback. My understanding about the interceptor is that > it's not something that would be touched by users but would only be > modified by cluster admins. So, I think we'd still need to include the > parameters the parameters on the ProducerConfig. > I already have the other approach coded, so changing the design will > require some rework. > I'll look at that example you mentioned. > > -- > Devin G. Bost > > On Fri, Apr 2, 2021, 10:18 PM Rui Fu <f...@rui.sh> wrote: > >> Hi Devin, >> >> Great proposal for giving function such useful feature, and thanks for >> your detailed writing. After going through the history of this discussion, >> I would like to have +1 with Jerry’s suggestion. With the interceptor, the >> admins and users may have the maximum flexibility to customize their >> producer configs. It may also reduce the parameters we put into the configs >> and pulsar-admin. Also, we have some interceptors with Pulsar Functions >> already, like the `RuntimeCustomizer` interface, so it is not a new tech >> that users hard to adopt. >> >> Besides, I would like to suggest that to cover these changes with Python >> & Go runtime as well if we will have some new fields to `Function.proto`. >> >> Best, >> >> Rui >> 在 2021年4月2日 +0800 AM10:13,Devin Bost <devin.b...@gmail.com>,写道: >> > What would be some of the additional benefits of using the interceptor >> > approach? >> > >> > -- >> > Devin G. Bost >> > >> > On Thu, Apr 1, 2021, 6:03 PM Jerry Peng <jerry.boyang.p...@gmail.com> >> wrote: >> > >> > > Devin, >> > > >> > > Interceptors are located on the server side (broker/worker). Cluster >> > > admins would create the plugins based on the interfaces I described >> and >> > > install them on the server side. Regular function developers will not >> > > implement or interact directly with the interceptor. >> > > >> > > > Would it be better to just ignore WorkerConfig defaults during >> function >> > > update? >> > > >> > > Yes. >> > > >> > > If the cluster admin changes the function config defaults and wants >> > > existing functions to utilize those configs that have changed, the >> admin >> > > can just update those configs of the functions through the regular >> update >> > > mechanism for functions we have today. >> > > >> > > >> > > >> > > On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <devin.b...@gmail.com> >> wrote: >> > > >> > > > Would it be better to just ignore WorkerConfig defaults during >> function >> > > > update? We could still update the function producer behavior >> through the >> > > > producerConfig passed as a REST parameter, but we'd avoid the edge >> case I >> > > > mentioned. >> > > > >> > > > Devin G. Bost >> > > > >> > > > >> > > > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com> >> wrote: >> > > > >> > > > > I just remembered an edge case that motivated me to create that >> > > > additional >> > > > > flow. >> > > > > In my PR, if any producerConfig values are null at registration or >> > > > update, >> > > > > we get those values from WorkerConfig instead. >> > > > > However, when a user runs an update (on a function) for the first >> time >> > > > > after upgrading to the version of Pulsar with this feature, if >> someone >> > > > has >> > > > > modified the function_worker.yml file (perhaps without their >> > > knowledge), >> > > > > those defaults from function_worker.yml will get picked up >> because the >> > > > > existing producerConfig will have null set for those values >> (because >> > > the >> > > > > function was registered prior to the existence of this feature.) >> So, >> > > > > the user could be trying to update something unrelated, not >> realizing >> > > > that >> > > > > their function is going to have its producerConfig updated by the >> > > cluster >> > > > > defaults. Most users won't be affected by this, but it could be >> tricky >> > > to >> > > > > diagnose for the few who would get impacted by it. (Is this >> description >> > > > any >> > > > > clearer? If not, it might be clearer if I diagram it.) >> > > > > >> > > > > Devin G. Bost >> > > > > >> > > > > >> > > > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng < >> jerry.boyang.p...@gmail.com >> > > > >> > > > > wrote: >> > > > > >> > > > > > Hi Devin, >> > > > > > >> > > > > > I understand the usefulness of giving cluster admins the >> ability to >> > > > > > specify >> > > > > > some defaults for functions at a cluster level. I think the >> right >> > > time >> > > > to >> > > > > > apply those defaults is during function registration time. I am >> not >> > > > sure >> > > > > > I >> > > > > > understand the usefulness of what you are proposing we do during >> > > update >> > > > > > time. If you would like to change a config of an already running >> > > > function >> > > > > > why not just update it with config changes that are needed. I >> am not >> > > > sure >> > > > > > you need the additional workflow you proposed for the update. >> > > > > > >> > > > > > Another idea to satisfy your use case is that instead of >> leveraging a >> > > > > > default config mechanism perhaps we should take a look at >> implementing >> > > > > > interceptors for the function/sources/sink API calls. We >> already have >> > > > > > interceptors implemented for many other pulsar functionalities. >> > > Perhaps >> > > > we >> > > > > > should do it for Pulsar Functions APIs as well. With >> interceptors you >> > > > > > will >> > > > > > be able to intercept all API operations to functions and >> customize the >> > > > > > requests however you would like. >> > > > > > >> > > > > > We can have a interface like: >> > > > > > >> > > > > > interface FunctionInterceptors { >> > > > > > >> > > > > > void regsterFunctionInterceptor (String tenant, String >> namespace, >> > > String >> > > > > > functionName, FunctionConfig functionConfig...); >> > > > > > >> > > > > > ... >> > > > > > >> > > > > > } >> > > > > > >> > > > > > for developers to implement. During the beginning of every >> Function >> > > API >> > > > > > call the corresponding interceptor method gets called. For >> example, >> > > the >> > > > > > "regsterFunctionInterceptor()" method I provided above will be >> called >> > > > > > towards the beginning of the registerFunction and allow you to >> > > customize >> > > > > > the function config however you want. >> > > > > > >> > > > > > Best, >> > > > > > >> > > > > > Jerry >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <devin.b...@gmail.com >> > >> > > wrote: >> > > > > > >> > > > > > > *Cluster-Wide and Function-Specific Producer Defaults* >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > * - Status: Proposal- Author: Devin Bost (with guidance from >> Jerry >> > > > > > Peng)- >> > > > > > > Pull Request: https://github.com/apache/pulsar/pull/9987 >> > > > > > > <https://github.com/apache/pulsar/pull/9987>- Mailing List >> > > > discussion: >> > > > > > - >> > > > > > > Release: 2.8.0* >> > > > > > > >> > > > > > > >> > > > > > > *Motivation* >> > > > > > > >> > > > > > > Pulsar currently provides no way to allow users or operators >> to >> > > change >> > > > > > the >> > > > > > > producer behavior of Pulsar functions. For organizations that >> are >> > > > making >> > > > > > > heavy use of functions, the ability to tune messaging >> behavior of >> > > > > > functions >> > > > > > > is greatly desired. Moreover, current function messaging >> defaults >> > > > > > appear to >> > > > > > > be causing issues for certain workloads. (For example, some >> bugs >> > > > appear >> > > > > > to >> > > > > > > be related to having batching enabled. See #6054.) Enabling >> > > operators >> > > > to >> > > > > > > modify these settings will help Pulsar cluster admins >> workaround >> > > > issues >> > > > > > to >> > > > > > > enable them to upgrade legacy versions of Pulsar to more >> recent >> > > > > > versions. >> > > > > > > Moreover, enabling greater tuning of these settings can >> provide more >> > > > > > > opportunity for performance optimization in production >> environments. >> > > > We >> > > > > > > need the ability to modify these function producer settings: >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > - batchingEnabled >> > > > > > > - chunkingEnabled >> > > > > > > - blockIfQueueFull >> > > > > > > - compressionType >> > > > > > > - hashingScheme >> > > > > > > - messageRoutingMode >> > > > > > > - batchingMaxPublishDelay >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > *Implementation Strategy* >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Ideally, we need a way to set default message producer >> behavior (for >> > > > > > > functions) on a cluster-wide level. However, operators may >> want >> > > > specific >> > > > > > > settings to be used for certain functions. (We can't assume >> that >> > > > > > everyone >> > > > > > > will want all functions in a cluster to have the same producer >> > > > > > settings.) >> > > > > > > When an operator changes the cluster-wide producer defaults, >> they >> > > > need a >> > > > > > > way to be able to roll out the changes to functions without >> > > > > > significantly >> > > > > > > disrupting messaging flows in production environments. They >> also >> > > need >> > > > a >> > > > > > > simple way to rollback changes in case a change to function >> producer >> > > > > > > settings results in undesired behavior. However, not all >> users will >> > > > want >> > > > > > > function updates to replace existing function producer >> settings, >> > > > > > especially >> > > > > > > for functions that have been given custom producer settings >> (at the >> > > > > > > per-function level.) Due to this difference in use cases, >> there >> > > needs >> > > > > > to be >> > > > > > > a way to allow an operator to specify what update behavior >> they >> > > want. >> > > > > > For a >> > > > > > > given update, the user should be able to specify if: >> > > > > > > >> > > > > > > 1. Existing function producer settings will be replaced by >> > > > > > cluster-wide >> > > > > > > defaults (unless overridden by producer settings provided in >> the >> > > > > > > request), >> > > > > > > or >> > > > > > > 2. Existing function producer settings will *not* be replaced >> by >> > > > > > > cluster-wide defaults (unless overridden by producer settings >> > > > > > provided >> > > > > > > in >> > > > > > > the request). >> > > > > > > >> > > > > > > So, the two orders of precedence are: >> > > > > > > >> > > > > > > 1. newProducerConfig > cluster-wide defaults > >> > > > existingProducerConfig >> > > > > > > 2. newProducerConfig > existingProducerConfig > cluster-wide >> > > > defaults >> > > > > > > >> > > > > > > We can add a boolean property to UpdateOptions to allow the >> > > precedence >> > > > > > to >> > > > > > > be specified. Since we don't want to introduce unexpected side >> > > effects >> > > > > > > during an update, the default precedence should prefer the >> > > > > > > existingProducerConfig over cluster-wide defaults. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > To ensure that the desired producer settings (after >> calculating the >> > > > > > > settings from defaults + overrides) are available when >> creating the >> > > > > > > producers (on any cluster), we need to add these producer >> settings >> > > to >> > > > > > the >> > > > > > > function protobuf definition. Also, to ensure that settings >> can be >> > > > > > properly >> > > > > > > validated at the time they are submitted, we need validation >> in two >> > > > > > places: >> > > > > > > >> > > > > > > 1. When cluster-wide configs are loaded upon broker startup >> > > > > > > 2. When a function is registered or updated. >> > > > > > > >> > > > > > > As it would be impractical to check every registered function >> > > > definition >> > > > > > > during broker startup, most of the validation needs to occur >> during >> > > > > > > function registration or update. However, during broker >> startup, we >> > > > can >> > > > > > at >> > > > > > > least validate that the configurations exist and throw an >> exception >> > > if >> > > > > > they >> > > > > > > are invalid. >> > > > > > > >> > > > > > > As any configuration exceptions thrown when the producer is >> created >> > > > > > would >> > > > > > > bubble up to the client, we need to ensure that the final >> > > > configurations >> > > > > > > are valid when a function is registered or updated to ensure >> there >> > > are >> > > > > > > exceptions when the producer is created. This requires >> computing the >> > > > > > > producer settings (according to the desired precedence) during >> > > > function >> > > > > > > registration or update. >> > > > > > > >> > > > > > > When a function is registered, existing producer settings do >> not >> > > exist >> > > > > > for >> > > > > > > it, so we simply need to get cluster-wide defaults and >> override them >> > > > > > with >> > > > > > > any specific producer settings provided in the >> FunctionConfig's >> > > > > > > ProducerConfig (submitted as a REST parameter.) After the >> initial >> > > > > > > registration, those computed settings will then be persisted >> in >> > > > > > bookkeeper >> > > > > > > with the function definition. >> > > > > > > >> > > > > > > When a function is updated, we also need to address the >> existing >> > > > > > > producerConfig, and for this, we need to check the desired >> override >> > > > > > > precedence (as explained above) when computing the final >> producer >> > > > > > settings >> > > > > > > that will be persisted with the function. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > *Modifications* >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > As WorkerConfig is readily available when functions are >> registered >> > > and >> > > > > > > updated, we can put new settings into function_worker.yml and >> can >> > > load >> > > > > > and >> > > > > > > validate them via WorkerConfig.load(..). To prevent >> introducing >> > > > breaking >> > > > > > > changes, parameters will have defaults assigned in >> WorkerConfig that >> > > > > > > conform to current expected behavior, like this: >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Data >> > > > > > > >> > > > > > > @Accessors(chain = true) >> > > > > > > >> > > > > > > public class FunctionDefaultsConfig implements Serializable { >> > > > > > > >> > > > > > > @FieldContext( >> > > > > > > >> > > > > > > doc = "Disables default message batching between >> > > > functions" >> > > > > > > >> > > > > > > ) >> > > > > > > >> > > > > > > protected boolean batchingDisabled = false; >> > > > > > > >> > > > > > > @FieldContext( >> > > > > > > >> > > > > > > doc = "Enables default message chunking between >> > > functions" >> > > > > > > >> > > > > > > ) >> > > > > > > >> > > > > > > protected boolean chunkingEnabled = false; >> > > > > > > >> > > > > > > @FieldContext( >> > > > > > > >> > > > > > > doc = "Disables default behavior to block when message >> > > > > > queue is >> > > > > > > full" >> > > > > > > >> > > > > > > ) >> > > > > > > >> > > > > > > protected boolean blockIfQueueFullDisabled = false; >> > > > > > > >> > > > > > > @FieldContext( >> > > > > > > >> > > > > > > doc = "Default compression type" >> > > > > > > >> > > > > > > ) >> > > > > > > >> > > > > > > protected String compressionType = "LZ4"; >> > > > > > > >> > > > > > > @FieldContext( >> > > > > > > >> > > > > > > doc = "Default hashing scheme" >> > > > > > > >> > > > > > > ) >> > > > > > > >> > > > > > > protected String hashingScheme = "Murmur3_32Hash"; >> > > > > > > >> > > > > > > @FieldContext( >> > > > > > > >> > > > > > > doc = "Default hashing scheme" >> > > > > > > >> > > > > > > ) >> > > > > > > >> > > > > > > protected String messageRoutingMode = "CustomPartition"; >> > > > > > > >> > > > > > > @FieldContext( >> > > > > > > >> > > > > > > doc = "Default max publish delay (in milliseconds) when >> > > > > > message >> > > > > > > batching is enabled" >> > > > > > > >> > > > > > > ) >> > > > > > > >> > > > > > > protected long batchingMaxPublishDelay = 10L; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > public ClusterFunctionProducerDefaults buildProducerDefaults() >> > > > > > > throws InvalidWorkerConfigDefaultException >> > > > > > > { >> > > > > > > >> > > > > > > return new >> > > > > > > ClusterFunctionProducerDefaults(this.isBatchingDisabled() >> > > > > > > , >> > > > > > > >> > > > > > > this.isChunkingEnabled(), >> > > > > > > this.isBlockIfQueueFullDisabled(), >> > > > > > > this.getCompressionType(), >> > > > > > > >> > > > > > > this.getHashingScheme(), >> > > this.getMessageRoutingMode(), >> > > > > > this >> > > > > > > .getBatchingMaxPublishDelay()); >> > > > > > > >> > > > > > > } >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > public FunctionDefaultsConfig() { >> > > > > > > >> > > > > > > } >> > > > > > > >> > > > > > > } >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > The additional section in function_worker.yml will look like >> this: >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > functionDefaults: >> > > > > > > >> > > > > > > batchingDisabled: true >> > > > > > > >> > > > > > > chunkingEnabled: false >> > > > > > > >> > > > > > > blockIfQueueFullDisabled: false >> > > > > > > >> > > > > > > batchingMaxPublishDelay: 12 >> > > > > > > >> > > > > > > compressionType: ZLIB >> > > > > > > >> > > > > > > hashingScheme: JavaStringHash >> > > > > > > >> > > > > > > messageRoutingMode: RoundRobinPartition >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > During function update, we can use a new boolean option, >> > > > > > > ignoreExistingFunctionDefaults, to toggle precedence: >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Data >> > > > > > > >> > > > > > > @NoArgsConstructor >> > > > > > > >> > > > > > > @ApiModel(value = "UpdateOptions", description = "Options >> while >> > > > updating >> > > > > > > function defaults or the sink") >> > > > > > > >> > > > > > > public class UpdateOptions { >> > > > > > > >> > > > > > > @ApiModelProperty( >> > > > > > > >> > > > > > > value = "Whether or not to update the auth data", >> > > > > > > >> > > > > > > name = "update-auth-data") >> > > > > > > >> > > > > > > private boolean updateAuthData = false; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @ApiModelProperty( >> > > > > > > >> > > > > > > value="Whether or not to ignore any existing function >> > > > > > > defaults", >> > > > > > > >> > > > > > > name ="ignore-existing-function-defaults") >> > > > > > > >> > > > > > > private boolean ignoreExistingFunctionDefaults = false; >> > > > > > > >> > > > > > > } >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > To ensure that function producer settings don't get modified >> > > > > > unexpectedly >> > > > > > > by an update, ignoreExistingFunctionDefaults = false by >> default. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > The updated ProducerConfig will contain the new properties >> and a >> > > > method >> > > > > > to >> > > > > > > merge an incoming ProducerConfig with an existing >> ProducerConfig. >> > > (The >> > > > > > > merged ProducerConfig will use the incoming ProducerConfig >> > > properties >> > > > > > when >> > > > > > > they exist and will use existing ProducerConfig properties >> > > otherwise.) >> > > > > > To >> > > > > > > ensure we don't need to force operators to provide those >> > > > ProducerConfig >> > > > > > > properties with every create or update, they must be >> nullable, not >> > > > > > > primitive (e.g. Boolean instead of boolean.) >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > To support the additional properties, FunctionConfigUtils >> (and any >> > > > > > similar >> > > > > > > classes) will need to be modified to correctly translate >> between the >> > > > > > > ProducerConfig and the protobuf ProducerSpec. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > The updated ProducerSpec in Function.proto will look like >> this: >> > > > > > > >> > > > > > > message ProducerSpec { >> > > > > > > >> > > > > > > int32 maxPendingMessages = 1; >> > > > > > > >> > > > > > > int32 maxPendingMessagesAcrossPartitions = 2; >> > > > > > > >> > > > > > > bool useThreadLocalProducers = 3; >> > > > > > > >> > > > > > > CryptoSpec cryptoSpec = 4; >> > > > > > > >> > > > > > > string batchBuilder = 5; >> > > > > > > >> > > > > > > bool batchingDisabled = 6; >> > > > > > > >> > > > > > > bool chunkingEnabled = 7; >> > > > > > > >> > > > > > > bool blockIfQueueFullDisabled = 8; >> > > > > > > >> > > > > > > CompressionType compressionType = 9; >> > > > > > > >> > > > > > > HashingScheme hashingScheme = 10; >> > > > > > > >> > > > > > > MessageRoutingMode messageRoutingMode = 11; >> > > > > > > >> > > > > > > int64 batchingMaxPublishDelay = 12; >> > > > > > > >> > > > > > > } >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > To support these new parameters during local run and for >> > > convenience, >> > > > we >> > > > > > > should be able to specify them in the Admin CLI and in local >> run >> > > > modes. >> > > > > > > >> > > > > > > For local run, as function_worker.yml is not used, these >> parameters >> > > > will >> > > > > > > have defaults and be primitive values: >> > > > > > > >> > > > > > > @Parameter(names = "--clusterFunctionBatchingDisabled", >> description >> > > = >> > > > > > > "Disable >> > > > > > > the default message batching behavior for functions", hidden >> = true) >> > > > > > > >> > > > > > > public boolean clusterFunctionBatchingDisabled = false; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Parameter(names = "--clusterFunctionChunkingEnabled", >> description = >> > > > > > "The >> > > > > > > default message chunking behavior for functions", hidden = >> true) >> > > > > > > >> > > > > > > public boolean clusterFunctionChunkingEnabled = false; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Parameter(names = >> "--clusterFunctionBlockIfQueueFullDisabled", >> > > > > > description >> > > > > > > = "Disable the default blocking behavior for functions when >> queue is >> > > > > > > full", hidden >> > > > > > > = true) >> > > > > > > >> > > > > > > public boolean clusterFunctionBlockIfQueueFullDisabled = >> false; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Parameter(names = "--clusterFunctionCompressionTypeDefault", >> > > > > > > description = "The >> > > > > > > default Compression Type for functions", hidden = true) >> > > > > > > >> > > > > > > public String clusterFunctionCompressionTypeDefault = "LZ4"; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Parameter(names = "--clusterFunctionHashingSchemeDefault", >> > > > description >> > > > > > = >> > > > > > > "The >> > > > > > > default Hashing Scheme for functions", hidden = true) >> > > > > > > >> > > > > > > public String clusterFunctionHashingSchemeDefault = >> > > "Murmur3_32Hash"; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Parameter(names = >> "--clusterFunctionMessageRoutingModeDefault", >> > > > > > > description >> > > > > > > = "The default Message Routing Mode for functions", hidden = >> true) >> > > > > > > >> > > > > > > public String clusterFunctionMessageRoutingModeDefault = >> > > > > > "CustomPartition"; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > @Parameter(names = >> > > "--clusterFunctionBatchingMaxPublishDelayDefault", >> > > > > > > description >> > > > > > > = "The default max publish delay (in milliseconds) for >> functions >> > > when >> > > > > > > message batching is enabled", hidden = true) >> > > > > > > >> > > > > > > public long clusterFunctionBatchingMaxPublishDelayDefault = >> 10L; >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > For Admin CLI, parameters can be provided in a similar way, >> but they >> > > > > > will >> > > > > > > not have defaults (i.e. Boolean instead of boolean) since >> parameters >> > > > > > should >> > > > > > > be null to ensure non-provided configs are obtained from >> > > WorkerConfig. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > To handle the logic of selecting the property from an incoming >> > > > > > > producerConfig vs the producerDefaults object (from >> WorkerConfig), >> > > we >> > > > > > can >> > > > > > > use a FunctionDefaultsMediator to handle this. That interface >> looks >> > > > like >> > > > > > > this and gives us flexibility to handle mediation for specific >> > > > > > properties >> > > > > > > without needing to convert the entire object. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > public interface FunctionDefaultsMediator { >> > > > > > > >> > > > > > > boolean isBatchingDisabled(); >> > > > > > > >> > > > > > > boolean isChunkingEnabled(); >> > > > > > > >> > > > > > > boolean isBlockIfQueueFullDisabled(); >> > > > > > > >> > > > > > > CompressionType getCompressionType(); >> > > > > > > >> > > > > > > Function.CompressionType getCompressionTypeProto(); >> > > > > > > >> > > > > > > HashingScheme getHashingScheme(); >> > > > > > > >> > > > > > > Function.HashingScheme getHashingSchemeProto(); >> > > > > > > >> > > > > > > MessageRoutingMode getMessageRoutingMode(); >> > > > > > > >> > > > > > > Function.MessageRoutingMode getMessageRoutingModeProto(); >> > > > > > > >> > > > > > > Long getBatchingMaxPublishDelay(); >> > > > > > > >> > > > > > > } >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > (Alternatively, we could remove the Proto methods from the >> mediator >> > > > and >> > > > > > > just use FunctionConfigUtils.convert(..) to translate between >> the >> > > > > > protobuf >> > > > > > > ProducerSpec and the Java ProducerConfig.) >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > *Testing* >> > > > > > > >> > > > > > > In terms of testing, many of this functionality will be >> covered by >> > > > > > existing >> > > > > > > tests in the pulsar-functions module. However, we need to >> test each >> > > of >> > > > > > the >> > > > > > > new classes, as well as these behaviors: >> > > > > > > >> > > > > > > - functionRegistration (with different provided ProducerConfig >> > > > > > > properties) >> > > > > > > - functionUpdate (for ignoreExistingFunctionDefaults=true and >> > > > > > > ignoreExistingFunctionDefaults=false) >> > > > > > > - loading function_worker.yml into WorkerConfig >> > > > > > > - throwing exceptions correctly when invalid parameters are >> > > > provided >> > > > > > > - converting between ProducerSpec and ProducerConfig >> > > > > > > - correctly handling precedence when computing the final >> > > > > > ProducerConfig >> > > > > > > values >> > > > > > > - creating producers >> > > > > > > - ensuring functionality is consistent across runtimes >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > I'd like feedback on this proposal. >> > > > > > > >> > > > > > > Devin G. Bost >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> >