What would be some of the additional benefits of using the interceptor
approach?

--
Devin G. Bost

On Thu, Apr 1, 2021, 6:03 PM Jerry Peng <jerry.boyang.p...@gmail.com> wrote:

> Devin,
>
> Interceptors are located on the server side (broker/worker).  Cluster
> admins would create  the plugins  based on the interfaces I described and
> install them on the server side.  Regular function developers will not
> implement or interact directly with the interceptor.
>
> > Would it be better to just ignore WorkerConfig defaults during function
> update?
>
> Yes.
>
> If the cluster admin changes the function config defaults and wants
> existing functions to utilize those configs that have changed, the admin
> can just update those configs of the functions through the regular update
> mechanism for functions we have today.
>
>
>
> On Thu, Apr 1, 2021 at 4:51 PM Devin Bost <devin.b...@gmail.com> wrote:
>
> > Would it be better to just ignore WorkerConfig defaults during function
> > update? We could still update the function producer behavior through the
> > producerConfig passed as a REST parameter, but we'd avoid the edge case I
> > mentioned.
> >
> > Devin G. Bost
> >
> >
> > On Thu, Apr 1, 2021 at 5:42 PM Devin Bost <devin.b...@gmail.com> wrote:
> >
> > > I just remembered an edge case that motivated me to create that
> > additional
> > > flow.
> > > In my PR, if any producerConfig values are null at registration or
> > update,
> > > we get those values from WorkerConfig instead.
> > > However, when a user runs an update (on a function) for the first time
> > > after upgrading to the version of Pulsar with this feature, if someone
> > has
> > > modified the function_worker.yml file (perhaps without their
> knowledge),
> > > those defaults from function_worker.yml will get picked up because the
> > > existing producerConfig will have null set for those values (because
> the
> > > function was registered prior to the existence of this feature.) So,
> > > the user could be trying to update something unrelated, not realizing
> > that
> > > their function is going to have its producerConfig updated by the
> cluster
> > > defaults. Most users won't be affected by this, but it could be tricky
> to
> > > diagnose for the few who would get impacted by it. (Is this description
> > any
> > > clearer? If not, it might be clearer if I diagram it.)
> > >
> > > Devin G. Bost
> > >
> > >
> > > On Thu, Apr 1, 2021 at 4:39 PM Jerry Peng <jerry.boyang.p...@gmail.com
> >
> > > wrote:
> > >
> > >> Hi Devin,
> > >>
> > >> I understand the usefulness of giving cluster admins the ability to
> > >> specify
> > >> some defaults for functions at a cluster level.  I think the right
> time
> > to
> > >> apply those defaults is during function registration time.  I am not
> > sure
> > >> I
> > >> understand the usefulness of what you are proposing we do during
> update
> > >> time.  If you would like to change a config of an already running
> > function
> > >> why not just update it with config changes that are needed.  I am not
> > sure
> > >> you need the additional workflow you proposed for the update.
> > >>
> > >> Another idea to satisfy your use case is that instead of leveraging a
> > >> default config mechanism perhaps we should take a look at implementing
> > >> interceptors for the function/sources/sink API calls.  We already have
> > >> interceptors implemented for many other pulsar functionalities.
> Perhaps
> > we
> > >> should do it for Pulsar Functions APIs as well.  With interceptors you
> > >> will
> > >> be able to intercept all API operations to functions and customize the
> > >> requests however you would like.
> > >>
> > >> We can have a interface like:
> > >>
> > >> interface FunctionInterceptors {
> > >>
> > >> void regsterFunctionInterceptor (String tenant, String namespace,
> String
> > >> functionName, FunctionConfig functionConfig...);
> > >>
> > >> ...
> > >>
> > >> }
> > >>
> > >> for developers to implement. During the beginning of every Function
> API
> > >> call the corresponding interceptor method gets called.  For example,
> the
> > >> "regsterFunctionInterceptor()" method I provided above will be called
> > >> towards the beginning of the registerFunction and allow you to
> customize
> > >> the function config however you want.
> > >>
> > >> Best,
> > >>
> > >> Jerry
> > >>
> > >>
> > >>
> > >> On Thu, Apr 1, 2021 at 2:08 PM Devin Bost <devin.b...@gmail.com>
> wrote:
> > >>
> > >> > *Cluster-Wide and Function-Specific Producer Defaults*
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > * - Status: Proposal- Author: Devin Bost (with guidance from Jerry
> > >> Peng)-
> > >> > Pull Request: https://github.com/apache/pulsar/pull/9987
> > >> > <https://github.com/apache/pulsar/pull/9987>- Mailing List
> > discussion:
> > >> -
> > >> > Release: 2.8.0*
> > >> >
> > >> >
> > >> > *Motivation*
> > >> >
> > >> > Pulsar currently provides no way to allow users or operators to
> change
> > >> the
> > >> > producer behavior of Pulsar functions. For organizations that are
> > making
> > >> > heavy use of functions, the ability to tune messaging behavior of
> > >> functions
> > >> > is greatly desired. Moreover, current function messaging defaults
> > >> appear to
> > >> > be causing issues for certain workloads. (For example, some bugs
> > appear
> > >> to
> > >> > be related to having batching enabled. See #6054.) Enabling
> operators
> > to
> > >> > modify these settings will help Pulsar cluster admins workaround
> > issues
> > >> to
> > >> > enable them to upgrade legacy versions of Pulsar to more recent
> > >> versions.
> > >> > Moreover, enabling greater tuning of these settings can provide more
> > >> > opportunity for performance optimization in production environments.
> > We
> > >> > need the ability to modify these function producer settings:
> > >> >
> > >> >
> > >> >
> > >> >    - batchingEnabled
> > >> >    - chunkingEnabled
> > >> >    - blockIfQueueFull
> > >> >    - compressionType
> > >> >    - hashingScheme
> > >> >    - messageRoutingMode
> > >> >    - batchingMaxPublishDelay
> > >> >
> > >> >
> > >> >
> > >> > *Implementation Strategy*
> > >> >
> > >> >
> > >> >
> > >> > Ideally, we need a way to set default message producer behavior (for
> > >> > functions) on a cluster-wide level. However, operators may want
> > specific
> > >> > settings to be used for certain functions. (We can't assume that
> > >> everyone
> > >> > will want all functions in a cluster to have the same producer
> > >> settings.)
> > >> > When an operator changes the cluster-wide producer defaults, they
> > need a
> > >> > way to be able to roll out the changes to functions without
> > >> significantly
> > >> > disrupting messaging flows in production environments. They also
> need
> > a
> > >> > simple way to rollback changes in case a change to function producer
> > >> > settings results in undesired behavior. However, not all users will
> > want
> > >> > function updates to replace existing function producer settings,
> > >> especially
> > >> > for functions that have been given custom producer settings (at the
> > >> > per-function level.) Due to this difference in use cases, there
> needs
> > >> to be
> > >> > a way to allow an operator to specify what update behavior they
> want.
> > >> For a
> > >> > given update, the user should be able to specify if:
> > >> >
> > >> >    1. Existing function producer settings will be replaced by
> > >> cluster-wide
> > >> >    defaults (unless overridden by producer settings provided in the
> > >> > request),
> > >> >    or
> > >> >    2. Existing function producer settings will *not* be replaced by
> > >> >    cluster-wide defaults (unless overridden by producer settings
> > >> provided
> > >> > in
> > >> >    the request).
> > >> >
> > >> > So, the two orders of precedence are:
> > >> >
> > >> >    1. newProducerConfig > cluster-wide defaults >
> > existingProducerConfig
> > >> >    2. newProducerConfig > existingProducerConfig > cluster-wide
> > defaults
> > >> >
> > >> > We can add a boolean property to UpdateOptions to allow the
> precedence
> > >> to
> > >> > be specified. Since we don't want to introduce unexpected side
> effects
> > >> > during an update, the default precedence should prefer the
> > >> > existingProducerConfig over cluster-wide defaults.
> > >> >
> > >> >
> > >> >
> > >> > To ensure that the desired producer settings (after calculating the
> > >> > settings from defaults + overrides) are available when creating the
> > >> > producers (on any cluster), we need to add these producer settings
> to
> > >> the
> > >> > function protobuf definition. Also, to ensure that settings can be
> > >> properly
> > >> > validated at the time they are submitted, we need validation in two
> > >> places:
> > >> >
> > >> >    1. When cluster-wide configs are loaded upon broker startup
> > >> >    2. When a function is registered or updated.
> > >> >
> > >> > As it would be impractical to check every registered function
> > definition
> > >> > during broker startup, most of the validation needs to occur during
> > >> > function registration or update. However, during broker startup, we
> > can
> > >> at
> > >> > least validate that the configurations exist and throw an exception
> if
> > >> they
> > >> > are invalid.
> > >> >
> > >> > As any configuration exceptions thrown when the producer is created
> > >> would
> > >> > bubble up to the client, we need to ensure that the final
> > configurations
> > >> > are valid when a function is registered or updated to ensure there
> are
> > >> > exceptions when the producer is created. This requires computing the
> > >> > producer settings (according to the desired precedence) during
> > function
> > >> > registration or update.
> > >> >
> > >> > When a function is registered, existing producer settings do not
> exist
> > >> for
> > >> > it, so we simply need to get cluster-wide defaults and override them
> > >> with
> > >> > any specific producer settings provided in the FunctionConfig's
> > >> > ProducerConfig (submitted as a REST parameter.) After the initial
> > >> > registration, those computed settings will then be persisted in
> > >> bookkeeper
> > >> > with the function definition.
> > >> >
> > >> > When a function is updated, we also need to address the existing
> > >> > producerConfig, and for this, we need to check the desired override
> > >> > precedence (as explained above) when computing the final producer
> > >> settings
> > >> > that will be persisted with the function.
> > >> >
> > >> >
> > >> >
> > >> > *Modifications*
> > >> >
> > >> >
> > >> >
> > >> > As WorkerConfig is readily available when functions are registered
> and
> > >> > updated, we can put new settings into function_worker.yml and can
> load
> > >> and
> > >> > validate them via WorkerConfig.load(..). To prevent introducing
> > breaking
> > >> > changes, parameters will have defaults assigned in WorkerConfig that
> > >> > conform to current expected behavior, like this:
> > >> >
> > >> >
> > >> >
> > >> > @Data
> > >> >
> > >> > @Accessors(chain = true)
> > >> >
> > >> > public class FunctionDefaultsConfig implements Serializable {
> > >> >
> > >> >     @FieldContext(
> > >> >
> > >> >             doc = "Disables default message batching between
> > functions"
> > >> >
> > >> >     )
> > >> >
> > >> >     protected boolean batchingDisabled = false;
> > >> >
> > >> >     @FieldContext(
> > >> >
> > >> >             doc = "Enables default message chunking between
> functions"
> > >> >
> > >> >     )
> > >> >
> > >> >     protected boolean chunkingEnabled = false;
> > >> >
> > >> >     @FieldContext(
> > >> >
> > >> >             doc = "Disables default behavior to block when message
> > >> queue is
> > >> > full"
> > >> >
> > >> >     )
> > >> >
> > >> >     protected boolean blockIfQueueFullDisabled = false;
> > >> >
> > >> >     @FieldContext(
> > >> >
> > >> >             doc = "Default compression type"
> > >> >
> > >> >     )
> > >> >
> > >> >     protected String compressionType = "LZ4";
> > >> >
> > >> >     @FieldContext(
> > >> >
> > >> >             doc = "Default hashing scheme"
> > >> >
> > >> >     )
> > >> >
> > >> >     protected String hashingScheme = "Murmur3_32Hash";
> > >> >
> > >> >     @FieldContext(
> > >> >
> > >> >             doc = "Default hashing scheme"
> > >> >
> > >> >     )
> > >> >
> > >> >     protected String messageRoutingMode = "CustomPartition";
> > >> >
> > >> >     @FieldContext(
> > >> >
> > >> >             doc = "Default max publish delay (in milliseconds) when
> > >> message
> > >> > batching is enabled"
> > >> >
> > >> >     )
> > >> >
> > >> >     protected long batchingMaxPublishDelay = 10L;
> > >> >
> > >> >
> > >> >
> > >> >     public ClusterFunctionProducerDefaults buildProducerDefaults()
> > >> > throws InvalidWorkerConfigDefaultException
> > >> > {
> > >> >
> > >> >         return new
> > >> > ClusterFunctionProducerDefaults(this.isBatchingDisabled()
> > >> > ,
> > >> >
> > >> >                 this.isChunkingEnabled(),
> > >> > this.isBlockIfQueueFullDisabled(),
> > >> > this.getCompressionType(),
> > >> >
> > >> >                 this.getHashingScheme(),
> this.getMessageRoutingMode(),
> > >> this
> > >> > .getBatchingMaxPublishDelay());
> > >> >
> > >> >     }
> > >> >
> > >> >
> > >> >
> > >> >     public FunctionDefaultsConfig() {
> > >> >
> > >> >     }
> > >> >
> > >> > }
> > >> >
> > >> >
> > >> >
> > >> > The additional section in function_worker.yml will look like this:
> > >> >
> > >> >
> > >> >
> > >> > functionDefaults:
> > >> >
> > >> >   batchingDisabled: true
> > >> >
> > >> >   chunkingEnabled: false
> > >> >
> > >> >   blockIfQueueFullDisabled: false
> > >> >
> > >> >   batchingMaxPublishDelay: 12
> > >> >
> > >> >   compressionType: ZLIB
> > >> >
> > >> >   hashingScheme: JavaStringHash
> > >> >
> > >> >   messageRoutingMode: RoundRobinPartition
> > >> >
> > >> >
> > >> >
> > >> > During function update, we can use a new boolean option,
> > >> > ignoreExistingFunctionDefaults, to toggle precedence:
> > >> >
> > >> >
> > >> >
> > >> > @Data
> > >> >
> > >> > @NoArgsConstructor
> > >> >
> > >> > @ApiModel(value = "UpdateOptions", description = "Options while
> > updating
> > >> > function defaults or the sink")
> > >> >
> > >> > public class UpdateOptions {
> > >> >
> > >> >     @ApiModelProperty(
> > >> >
> > >> >         value = "Whether or not to update the auth data",
> > >> >
> > >> >         name = "update-auth-data")
> > >> >
> > >> >     private boolean updateAuthData = false;
> > >> >
> > >> >
> > >> >
> > >> >     @ApiModelProperty(
> > >> >
> > >> >             value="Whether or not to ignore any existing function
> > >> > defaults",
> > >> >
> > >> >             name ="ignore-existing-function-defaults")
> > >> >
> > >> >     private boolean ignoreExistingFunctionDefaults = false;
> > >> >
> > >> > }
> > >> >
> > >> >
> > >> >
> > >> > To ensure that function producer settings don't get modified
> > >> unexpectedly
> > >> > by an update, ignoreExistingFunctionDefaults = false by default.
> > >> >
> > >> >
> > >> >
> > >> > The updated ProducerConfig will contain the new properties and a
> > method
> > >> to
> > >> > merge an incoming ProducerConfig with an existing ProducerConfig.
> (The
> > >> > merged ProducerConfig will use the incoming ProducerConfig
> properties
> > >> when
> > >> > they exist and will use existing ProducerConfig properties
> otherwise.)
> > >> To
> > >> > ensure we don't need to force operators to provide those
> > ProducerConfig
> > >> > properties with every create or update, they must be nullable, not
> > >> > primitive (e.g. Boolean instead of boolean.)
> > >> >
> > >> >
> > >> >
> > >> > To support the additional properties, FunctionConfigUtils (and any
> > >> similar
> > >> > classes) will need to be modified to correctly translate between the
> > >> > ProducerConfig and the protobuf ProducerSpec.
> > >> >
> > >> >
> > >> >
> > >> > The updated ProducerSpec in Function.proto will look like this:
> > >> >
> > >> > message ProducerSpec {
> > >> >
> > >> >     int32 maxPendingMessages = 1;
> > >> >
> > >> >     int32 maxPendingMessagesAcrossPartitions = 2;
> > >> >
> > >> >     bool useThreadLocalProducers = 3;
> > >> >
> > >> >     CryptoSpec cryptoSpec = 4;
> > >> >
> > >> >     string batchBuilder = 5;
> > >> >
> > >> >     bool batchingDisabled = 6;
> > >> >
> > >> >     bool chunkingEnabled = 7;
> > >> >
> > >> >     bool blockIfQueueFullDisabled = 8;
> > >> >
> > >> >     CompressionType compressionType = 9;
> > >> >
> > >> >     HashingScheme hashingScheme = 10;
> > >> >
> > >> >     MessageRoutingMode messageRoutingMode = 11;
> > >> >
> > >> >     int64 batchingMaxPublishDelay = 12;
> > >> >
> > >> > }
> > >> >
> > >> >
> > >> >
> > >> > To support these new parameters during local run and for
> convenience,
> > we
> > >> > should be able to specify them in the Admin CLI and in local run
> > modes.
> > >> >
> > >> > For local run, as function_worker.yml is not used, these parameters
> > will
> > >> > have defaults and be primitive values:
> > >> >
> > >> > @Parameter(names = "--clusterFunctionBatchingDisabled", description
> =
> > >> > "Disable
> > >> > the default message batching behavior for functions", hidden = true)
> > >> >
> > >> > public boolean clusterFunctionBatchingDisabled = false;
> > >> >
> > >> >
> > >> >
> > >> > @Parameter(names = "--clusterFunctionChunkingEnabled", description =
> > >> "The
> > >> > default message chunking behavior for functions", hidden = true)
> > >> >
> > >> > public boolean clusterFunctionChunkingEnabled = false;
> > >> >
> > >> >
> > >> >
> > >> > @Parameter(names = "--clusterFunctionBlockIfQueueFullDisabled",
> > >> description
> > >> > = "Disable the default blocking behavior for functions when queue is
> > >> > full", hidden
> > >> > = true)
> > >> >
> > >> > public boolean clusterFunctionBlockIfQueueFullDisabled = false;
> > >> >
> > >> >
> > >> >
> > >> > @Parameter(names = "--clusterFunctionCompressionTypeDefault",
> > >> > description = "The
> > >> > default Compression Type for functions", hidden = true)
> > >> >
> > >> > public String clusterFunctionCompressionTypeDefault = "LZ4";
> > >> >
> > >> >
> > >> >
> > >> > @Parameter(names = "--clusterFunctionHashingSchemeDefault",
> > description
> > >> =
> > >> > "The
> > >> > default Hashing Scheme for functions", hidden = true)
> > >> >
> > >> > public String clusterFunctionHashingSchemeDefault =
> "Murmur3_32Hash";
> > >> >
> > >> >
> > >> >
> > >> > @Parameter(names = "--clusterFunctionMessageRoutingModeDefault",
> > >> > description
> > >> > = "The default Message Routing Mode for functions", hidden = true)
> > >> >
> > >> > public String clusterFunctionMessageRoutingModeDefault =
> > >> "CustomPartition";
> > >> >
> > >> >
> > >> >
> > >> > @Parameter(names =
> "--clusterFunctionBatchingMaxPublishDelayDefault",
> > >> > description
> > >> > = "The default max publish delay (in milliseconds) for functions
> when
> > >> > message batching is enabled", hidden = true)
> > >> >
> > >> > public long clusterFunctionBatchingMaxPublishDelayDefault = 10L;
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > For Admin CLI, parameters can be provided in a similar way, but they
> > >> will
> > >> > not have defaults (i.e. Boolean instead of boolean) since parameters
> > >> should
> > >> > be null to ensure non-provided configs are obtained from
> WorkerConfig.
> > >> >
> > >> >
> > >> >
> > >> > To handle the logic of selecting the property from an incoming
> > >> > producerConfig vs the producerDefaults object (from WorkerConfig),
> we
> > >> can
> > >> > use a FunctionDefaultsMediator to handle this. That interface looks
> > like
> > >> > this and gives us flexibility to handle mediation for specific
> > >> properties
> > >> > without needing to convert the entire object.
> > >> >
> > >> >
> > >> >
> > >> > public interface FunctionDefaultsMediator {
> > >> >
> > >> >     boolean isBatchingDisabled();
> > >> >
> > >> >     boolean isChunkingEnabled();
> > >> >
> > >> >     boolean isBlockIfQueueFullDisabled();
> > >> >
> > >> >     CompressionType getCompressionType();
> > >> >
> > >> >     Function.CompressionType getCompressionTypeProto();
> > >> >
> > >> >     HashingScheme getHashingScheme();
> > >> >
> > >> >     Function.HashingScheme getHashingSchemeProto();
> > >> >
> > >> >     MessageRoutingMode getMessageRoutingMode();
> > >> >
> > >> >     Function.MessageRoutingMode getMessageRoutingModeProto();
> > >> >
> > >> >     Long getBatchingMaxPublishDelay();
> > >> >
> > >> > }
> > >> >
> > >> >
> > >> >
> > >> > (Alternatively, we could remove the Proto methods from the mediator
> > and
> > >> > just use FunctionConfigUtils.convert(..) to translate between the
> > >> protobuf
> > >> > ProducerSpec and the Java ProducerConfig.)
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > *Testing*
> > >> >
> > >> > In terms of testing, many of this functionality will be covered by
> > >> existing
> > >> > tests in the pulsar-functions module. However, we need to test each
> of
> > >> the
> > >> > new classes, as well as these behaviors:
> > >> >
> > >> >    - functionRegistration (with different provided ProducerConfig
> > >> >    properties)
> > >> >    - functionUpdate (for ignoreExistingFunctionDefaults=true and
> > >> >    ignoreExistingFunctionDefaults=false)
> > >> >    - loading function_worker.yml into WorkerConfig
> > >> >    - throwing exceptions correctly when invalid parameters are
> > provided
> > >> >    - converting between ProducerSpec and ProducerConfig
> > >> >    - correctly handling precedence when computing the final
> > >> ProducerConfig
> > >> >    values
> > >> >    - creating producers
> > >> >    - ensuring functionality is consistent across runtimes
> > >> >
> > >> >
> > >> >
> > >> > I'd like feedback on this proposal.
> > >> >
> > >> > Devin G. Bost
> > >> >
> > >>
> > >
> >
>

Reply via email to