*Cluster-Wide and Function-Specific Producer Defaults*



* - Status: Proposal- Author: Devin Bost (with guidance from Jerry Peng)-
Pull Request: https://github.com/apache/pulsar/pull/9987
<https://github.com/apache/pulsar/pull/9987>- Mailing List discussion: -
Release: 2.8.0*


*Motivation*

Pulsar currently provides no way to allow users or operators to change the
producer behavior of Pulsar functions. For organizations that are making
heavy use of functions, the ability to tune messaging behavior of functions
is greatly desired. Moreover, current function messaging defaults appear to
be causing issues for certain workloads. (For example, some bugs appear to
be related to having batching enabled. See #6054.) Enabling operators to
modify these settings will help Pulsar cluster admins workaround issues to
enable them to upgrade legacy versions of Pulsar to more recent versions.
Moreover, enabling greater tuning of these settings can provide more
opportunity for performance optimization in production environments. We
need the ability to modify these function producer settings:



   - batchingEnabled
   - chunkingEnabled
   - blockIfQueueFull
   - compressionType
   - hashingScheme
   - messageRoutingMode
   - batchingMaxPublishDelay



*Implementation Strategy*



Ideally, we need a way to set default message producer behavior (for
functions) on a cluster-wide level. However, operators may want specific
settings to be used for certain functions. (We can't assume that everyone
will want all functions in a cluster to have the same producer settings.)
When an operator changes the cluster-wide producer defaults, they need a
way to be able to roll out the changes to functions without significantly
disrupting messaging flows in production environments. They also need a
simple way to rollback changes in case a change to function producer
settings results in undesired behavior. However, not all users will want
function updates to replace existing function producer settings, especially
for functions that have been given custom producer settings (at the
per-function level.) Due to this difference in use cases, there needs to be
a way to allow an operator to specify what update behavior they want. For a
given update, the user should be able to specify if:

   1. Existing function producer settings will be replaced by cluster-wide
   defaults (unless overridden by producer settings provided in the request),
   or
   2. Existing function producer settings will *not* be replaced by
   cluster-wide defaults (unless overridden by producer settings provided in
   the request).

So, the two orders of precedence are:

   1. newProducerConfig > cluster-wide defaults > existingProducerConfig
   2. newProducerConfig > existingProducerConfig > cluster-wide defaults

We can add a boolean property to UpdateOptions to allow the precedence to
be specified. Since we don't want to introduce unexpected side effects
during an update, the default precedence should prefer the
existingProducerConfig over cluster-wide defaults.



To ensure that the desired producer settings (after calculating the
settings from defaults + overrides) are available when creating the
producers (on any cluster), we need to add these producer settings to the
function protobuf definition. Also, to ensure that settings can be properly
validated at the time they are submitted, we need validation in two places:

   1. When cluster-wide configs are loaded upon broker startup
   2. When a function is registered or updated.

As it would be impractical to check every registered function definition
during broker startup, most of the validation needs to occur during
function registration or update. However, during broker startup, we can at
least validate that the configurations exist and throw an exception if they
are invalid.

As any configuration exceptions thrown when the producer is created would
bubble up to the client, we need to ensure that the final configurations
are valid when a function is registered or updated to ensure there are
exceptions when the producer is created. This requires computing the
producer settings (according to the desired precedence) during function
registration or update.

When a function is registered, existing producer settings do not exist for
it, so we simply need to get cluster-wide defaults and override them with
any specific producer settings provided in the FunctionConfig's
ProducerConfig (submitted as a REST parameter.) After the initial
registration, those computed settings will then be persisted in bookkeeper
with the function definition.

When a function is updated, we also need to address the existing
producerConfig, and for this, we need to check the desired override
precedence (as explained above) when computing the final producer settings
that will be persisted with the function.



*Modifications*



As WorkerConfig is readily available when functions are registered and
updated, we can put new settings into function_worker.yml and can load and
validate them via WorkerConfig.load(..). To prevent introducing breaking
changes, parameters will have defaults assigned in WorkerConfig that
conform to current expected behavior, like this:



@Data

@Accessors(chain = true)

public class FunctionDefaultsConfig implements Serializable {

    @FieldContext(

            doc = "Disables default message batching between functions"

    )

    protected boolean batchingDisabled = false;

    @FieldContext(

            doc = "Enables default message chunking between functions"

    )

    protected boolean chunkingEnabled = false;

    @FieldContext(

            doc = "Disables default behavior to block when message queue is
full"

    )

    protected boolean blockIfQueueFullDisabled = false;

    @FieldContext(

            doc = "Default compression type"

    )

    protected String compressionType = "LZ4";

    @FieldContext(

            doc = "Default hashing scheme"

    )

    protected String hashingScheme = "Murmur3_32Hash";

    @FieldContext(

            doc = "Default hashing scheme"

    )

    protected String messageRoutingMode = "CustomPartition";

    @FieldContext(

            doc = "Default max publish delay (in milliseconds) when message
batching is enabled"

    )

    protected long batchingMaxPublishDelay = 10L;



    public ClusterFunctionProducerDefaults buildProducerDefaults()
throws InvalidWorkerConfigDefaultException
{

        return new ClusterFunctionProducerDefaults(this.isBatchingDisabled()
,

                this.isChunkingEnabled(), this.isBlockIfQueueFullDisabled(),
this.getCompressionType(),

                this.getHashingScheme(), this.getMessageRoutingMode(), this
.getBatchingMaxPublishDelay());

    }



    public FunctionDefaultsConfig() {

    }

}



The additional section in function_worker.yml will look like this:



functionDefaults:

  batchingDisabled: true

  chunkingEnabled: false

  blockIfQueueFullDisabled: false

  batchingMaxPublishDelay: 12

  compressionType: ZLIB

  hashingScheme: JavaStringHash

  messageRoutingMode: RoundRobinPartition



During function update, we can use a new boolean option,
ignoreExistingFunctionDefaults, to toggle precedence:



@Data

@NoArgsConstructor

@ApiModel(value = "UpdateOptions", description = "Options while updating
function defaults or the sink")

public class UpdateOptions {

    @ApiModelProperty(

        value = "Whether or not to update the auth data",

        name = "update-auth-data")

    private boolean updateAuthData = false;



    @ApiModelProperty(

            value="Whether or not to ignore any existing function defaults",

            name ="ignore-existing-function-defaults")

    private boolean ignoreExistingFunctionDefaults = false;

}



To ensure that function producer settings don't get modified unexpectedly
by an update, ignoreExistingFunctionDefaults = false by default.



The updated ProducerConfig will contain the new properties and a method to
merge an incoming ProducerConfig with an existing ProducerConfig. (The
merged ProducerConfig will use the incoming ProducerConfig properties when
they exist and will use existing ProducerConfig properties otherwise.) To
ensure we don't need to force operators to provide those ProducerConfig
properties with every create or update, they must be nullable, not
primitive (e.g. Boolean instead of boolean.)



To support the additional properties, FunctionConfigUtils (and any similar
classes) will need to be modified to correctly translate between the
ProducerConfig and the protobuf ProducerSpec.



The updated ProducerSpec in Function.proto will look like this:

message ProducerSpec {

    int32 maxPendingMessages = 1;

    int32 maxPendingMessagesAcrossPartitions = 2;

    bool useThreadLocalProducers = 3;

    CryptoSpec cryptoSpec = 4;

    string batchBuilder = 5;

    bool batchingDisabled = 6;

    bool chunkingEnabled = 7;

    bool blockIfQueueFullDisabled = 8;

    CompressionType compressionType = 9;

    HashingScheme hashingScheme = 10;

    MessageRoutingMode messageRoutingMode = 11;

    int64 batchingMaxPublishDelay = 12;

}



To support these new parameters during local run and for convenience, we
should be able to specify them in the Admin CLI and in local run modes.

For local run, as function_worker.yml is not used, these parameters will
have defaults and be primitive values:

@Parameter(names = "--clusterFunctionBatchingDisabled", description = "Disable
the default message batching behavior for functions", hidden = true)

public boolean clusterFunctionBatchingDisabled = false;



@Parameter(names = "--clusterFunctionChunkingEnabled", description = "The
default message chunking behavior for functions", hidden = true)

public boolean clusterFunctionChunkingEnabled = false;



@Parameter(names = "--clusterFunctionBlockIfQueueFullDisabled", description
= "Disable the default blocking behavior for functions when queue is
full", hidden
= true)

public boolean clusterFunctionBlockIfQueueFullDisabled = false;



@Parameter(names = "--clusterFunctionCompressionTypeDefault",
description = "The
default Compression Type for functions", hidden = true)

public String clusterFunctionCompressionTypeDefault = "LZ4";



@Parameter(names = "--clusterFunctionHashingSchemeDefault", description = "The
default Hashing Scheme for functions", hidden = true)

public String clusterFunctionHashingSchemeDefault = "Murmur3_32Hash";



@Parameter(names = "--clusterFunctionMessageRoutingModeDefault", description
= "The default Message Routing Mode for functions", hidden = true)

public String clusterFunctionMessageRoutingModeDefault = "CustomPartition";



@Parameter(names = "--clusterFunctionBatchingMaxPublishDelayDefault",
description
= "The default max publish delay (in milliseconds) for functions when
message batching is enabled", hidden = true)

public long clusterFunctionBatchingMaxPublishDelayDefault = 10L;





For Admin CLI, parameters can be provided in a similar way, but they will
not have defaults (i.e. Boolean instead of boolean) since parameters should
be null to ensure non-provided configs are obtained from WorkerConfig.



To handle the logic of selecting the property from an incoming
producerConfig vs the producerDefaults object (from WorkerConfig), we can
use a FunctionDefaultsMediator to handle this. That interface looks like
this and gives us flexibility to handle mediation for specific properties
without needing to convert the entire object.



public interface FunctionDefaultsMediator {

    boolean isBatchingDisabled();

    boolean isChunkingEnabled();

    boolean isBlockIfQueueFullDisabled();

    CompressionType getCompressionType();

    Function.CompressionType getCompressionTypeProto();

    HashingScheme getHashingScheme();

    Function.HashingScheme getHashingSchemeProto();

    MessageRoutingMode getMessageRoutingMode();

    Function.MessageRoutingMode getMessageRoutingModeProto();

    Long getBatchingMaxPublishDelay();

}



(Alternatively, we could remove the Proto methods from the mediator and
just use FunctionConfigUtils.convert(..) to translate between the protobuf
ProducerSpec and the Java ProducerConfig.)




*Testing*

In terms of testing, many of this functionality will be covered by existing
tests in the pulsar-functions module. However, we need to test each of the
new classes, as well as these behaviors:

   - functionRegistration (with different provided ProducerConfig
   properties)
   - functionUpdate (for ignoreExistingFunctionDefaults=true and
   ignoreExistingFunctionDefaults=false)
   - loading function_worker.yml into WorkerConfig
   - throwing exceptions correctly when invalid parameters are provided
   - converting between ProducerSpec and ProducerConfig
   - correctly handling precedence when computing the final ProducerConfig
   values
   - creating producers
   - ensuring functionality is consistent across runtimes



I'd like feedback on this proposal.

Devin G. Bost

Reply via email to