scott-hendricks opened a new pull request #9736:
URL: https://github.com/apache/kafka/pull/9736


   This PR pulls together a couple of the outstanding one-pagers we have for Q4:
   
   https://confluentinc.atlassian.net/wiki/spaces/QERM/pages/1605992772
   https://confluentinc.atlassian.net/wiki/spaces/QERM/pages/1589186862
   
   This allows us to run highly granular and configurable workloads to directly 
simulate customer scenarios and measure the end to end latency all within 
Trogdor.
   
   This creates a new `ConfigurableProducer` workload that can be used to tune 
many parts of the workload better than the `ProduceBench` workload.
   
   This also creates all the helper classes for the `ConfigurableProducer` 
workload.
   
   This adds a new parameter to the `ConsumeBench` workload to allow for 
processing of records after polling them.
   
   This also adds a new E2E latency test utilizing the new record processor 
within the `ConsumeBench` workload.
   
   ---
   # ConfigurableProducer workload
   
   The ConfigurableProducer workload allows for customized and even variable 
configurations in terms of messages per second, message size, batch size, key 
size, and even the ability to target a specific partition out of a topic.
   
   The parameters that differ from ProduceBenchSpec:
   
   * `flushGenerator` - Used to instruct the KafkaProducer when to issue 
flushes.  This allows us to simulate variable batching since batch flushing is 
not currently exposed within the KafkaProducer class.
   * `throughputGenerator` - Used to throttle the ConfigurableProducerWorker 
based on a calculated number of messages within a window.
   * `activeTopic` - This class only supports execution against a single topic 
at a time.  If more than one topic is specified, the ConfigurableProducerWorker 
will throw an error.
   * `activePartition` - Specify a specific partition number within the 
activeTopic to run load against, or specify `-1` to allow use of all partitions.
   
   Here is an example spec:
   
   ```
   {
       "startMs": 1606949497662,
       "durationMs": 3600000,
       "producerNode": "trogdor-agent-0",
       "bootstrapServers": "some.example.kafka.server:9091",
       "flushGenerator": {
           "type": "gaussian",
           "messagesPerFlushAverage": 16,
           "messagesPerFlushDeviation": 4
       },
       "throughputGenerator": {
           "type": "gaussian",
           "messagesPerSecondAverage": 500,
           "messagesPerSecondDeviation": 50,
           "windowsUntilRateChange": 100,
           "windowSizeMs": 100
       },
       "keyGenerator": {
           "type": "constant",
           "size": 8
       },
       "valueGenerator": {
           "type": "gaussianTimestampRandom",
           "messageSizeAverage": 512,
           "messageSizeDeviation": 100,
           "timestampBytes": 8,
           "messagesUntilSizeChange": 100
       },
       "producerConf": {
           "acks": "all"
       },
       "commonClientConf": {},
       "adminClientConf": {},
       "activeTopic": {
           "topic0": {
               "numPartitions": 100,
               "replicationFactor": 3,
               "configs": {
                   "retention.ms": "1800000"
               }
           }
       },
       "activePartition": 5
   }
   ```
   
   This example spec performed the following:
   
   * Ran on `trogdor-agent-0` for 1 hour starting at 2020-12-02 22:51:37.662 GMT
   * Produced with acks=all to Partition 5 of `topic0` on kafka server 
`some.example.kafka.server:9091`.
   * The average batch had 16 messages, with a standard deviation of 4 messages.
   * The message had a 8-bit constant key, with an average value of 512 bytes 
and a standard deviation of 100 bytes.
   * The messages had millisecond timestamps embedded in the first 8-bytes of 
the value.
   * The average throughput was 500 messages/second, with a window of 100ms and 
a deviation of 50 messages/second.
   
   ---
   
   # ConsumeBench workload changes
   
   This commit adds the ability for the ConsumeBench workloads to optionally 
process the records returned through the consumer poll call.  This is done by 
specifying a new `recordProcessor` parameter.  The record processor's status is 
then included in the workload's status.
   
   ## RecordProcessor
   
   This interface provides the ability to optionally process records after the 
ConsumeBench workload polls them.  The interface provides for the ability to 
include additional data in the status output.
   
   Currently there are 2 processing methods:
   
   * Disabled, by not specifying this parameter.
   * `timestamp` will use `TimestampRecordProcessor` to process records 
containing a timestamp in the first several bytes of the message.
   
   ### TimestampRecordProcessor
   
   This includes a `TimestampRecordProcessor` class to process records 
containing a timestamp in the first several bytes of the message.   This class 
will process records containing timestamps and generate a histogram based on 
the data.  It will then be present in the status from the `ConsumeBenchWorker` 
class.  This must be used with a timestamped PayloadGenerator implementation.
   
   Here's an example spec:
   
   ```
   {
      "type": "timestampRandom",
      "timestampBytes": 8,
      "histogramMaxMs": 10000,
      "histogramMinMs": 0,
      "histogramStepMs": 1
   }
   ```
   
   This will track total E2E latency up to 10 seconds, using 1ms resolution and 
a timestamp size of 8 bytes.
   
   ---
   
   # FlushGenerator
   
   A FlushGenerator is used to facilitate flushing the KafkaProducers on a 
cadence specified by the user.  This is useful to simulate a specific number of 
messages in a batch regardless of the message size, since batch flushing is not 
exposed in the KafkaProducer.
   
   Currently there are 3 flushing methods:
   
   * Disabled, by not specifying this parameter.
   * `constant` will use `ConstantFlushGenerator` to keep the number of 
messages per batch constant.
   * `gaussian` will use `GaussianFlushGenerator` to vary the number of 
messages per batch on a normal distribution.
   
   ## ConstantFlushGenerator
   
   This generator will flush the producer after a specific number of messages.  
This does not directly control when KafkaProducer will batch, this only makes 
best effort.  This also cannot tell when a KafkaProducer batch is closed.  If 
the KafkaProducer sends a batch before this executes, this will continue to 
execute on its own cadence.
   
   Here is an example spec:
   
   ```
   {
      "type": "constant",
      "messagesPerFlush": 16
   }
   ```
   
   This example will flush the producer every 16 messages.
   
   ## GaussianFlushGenerator
   
   This generator will flush the producer after a specific number of messages, 
determined by a gaussian distribution.  This does not directly control when 
KafkaProducer will batch, this only makes best effort.  This also cannot tell 
when a KafkaProducer batch is closed.  If the KafkaProducer sends a batch 
before this executes, this will continue to execute on its own cadence.
   
   Here is an example spec:
   
   ```
   {
      "type": "gaussian",
      "messagesPerFlushAverage": 16,
      "messagesPerFlushDeviation": 4
   }
   ```
   
   This example will flush the producer on average every 16 messages, assuming 
`linger.ms` and `batch.size` allow for
   it.  That average changes based on a normal distribution after each flush:
   
   * An average of the flushes will be at 16 messages.
   * ~68% of the flushes are at between 12 and 20 messages.
   * ~95% of the flushes are at between 8 and 24 messages.
   * ~99% of the flushes are at between 4 and 28 messages.
   
   ---
   
   # ThroughputGenerator
   
   Similar to the throttle class, except a simpler design.  This interface is 
used to facilitate running a configurable number of messages per second by 
throttling if the throughput goes above a certain amount.
   
   Currently there are 2 throughput methods:
   
   * `constant` will use `ConstantThroughputGenerator` to keep the number of 
messages per second constant.
   * `gaussian` will use `GaussianThroughputGenerator` to vary the number of 
messages per second on a normal distribution.
   
   ## ConstantThroughputGenerator
   
   This throughput generator configures constant throughput.  The lower the 
window size, the smoother the traffic will be. Using a 100ms window offers no 
noticeable spikes in traffic while still being long enough to avoid too much 
overhead.
   
   Due to binary nature of throughput in terms of messages sent in a window, 
each window will send at least 1 message, and each window sends the same number 
of messages, rounded down. For example, 99 messages per second with a 100ms 
window will only send 90 messages per second, or 9 messages per window. Another 
example, in order to send only 5 messages per second, a window size of 200ms is 
required. In cases like these, both the `messagesPerSecond` and `windowSizeMs` 
parameters should be adjusted together to achieve more accurate throughput.
   
   Here is an example spec:
   
   ```
   {
      "type": "constant",
      "messagesPerSecond": 500,
      "windowSizeMs": 100
   }
   ```
   
   This will produce a workload that runs 500 messages per second, with a 
maximum resolution of 50 messages per 100 millisecond.
   
   ## GaussianThroughputGenerator
   
   This throughput generator configures throughput with a gaussian normal 
distribution on a per-window basis. You can specify how many windows to keep 
the throughput at the rate before changing. All traffic will follow a gaussian 
distribution centered around `messagesPerSecondAverage` with a deviation of 
`messagesPerSecondDeviation`.  The lower the window size, the smoother the 
traffic will be. Using a 100ms window offers no noticeable spikes in traffic 
while still being long enough to avoid too much overhead.
   
   Due to binary nature of throughput in terms of messages sent in a window, 
this does not work well for an average throughput of less than 5 messages per 
window.  In cases where you want lower throughput, the `windowSizeMs` must be 
adjusted accordingly.
   
   Here is an example spec:
   
   ```
   {
      "type": "gaussian",
      "messagesPerSecondAverage": 500,
      "messagesPerSecondDeviation": 50,
      "windowsUntilRateChange": 100,
      "windowSizeMs": 100
   }
   ```
   
   This will produce a workload that runs on average 500 messages per second, 
however that speed will change every 10 seconds due to the `windowSizeMs * 
windowsUntilRateChange` parameters. The throughput will have the following 
normal distribution:
   
   * An average of the throughput windows of 500 messages per second.
   * ~68% of the throughput windows are between 450 and 550 messages per second.
   * ~95% of the throughput windows are between 400 and 600 messages per second.
   * ~99% of the throughput windows are between 350 and 650 messages per second.
   
   ---
   
   # Additional Payload Generators
   
   This implementation also offers additional payload generators to facilitate 
the tests these workloads are designed to run.  These are also compatible with 
the existing ProduceBench workloads.
   
   ## TimestampRandomPayloadGenerator
   
   This generator generates timestamped pseudo-random payloads that can be 
reproduced from run to run.  The guarantees are the same as those of 
java.util.Random.  The timestamp used for this class is in milliseconds since 
epoch, encoded directly to the first several bytes of the payload. This should 
be used in conjunction with TimestampRecordProcessor in the Consumer to measure 
true end-to-end latency of a system.
   
   * `size` - The size in bytes of each message.
   * `timestampBytes` - The amount of bytes to use for the timestamp.  Usually 
8.
   * `seed` - Used to initialize Random() to remove some non-determinism.
   
   Here is an example spec:
   
   ```
   {
      "type": "timestampRandom",
      "size": 512,
      "timestampBytes": 8
   }
   ```
   
   This will generate a 512-byte random message with the first 8 bytes encoded 
with the timestamp.
   
   ## GaussianTimestampRandomPayloadGenerator
   
   This class behaves identically to TimestampRandomPayloadGenerator, except 
the message size follows a gaussian distribution.  This should be used in 
conjunction with TimestampRecordProcessor in the Consumer to measure true 
end-to-end latency of a system.
   
   * `messageSizeAverage` - The average size in bytes of each message.
   * `messageSizeDeviation` - The standard deviation to use when calculating 
message size.
   * `timestampBytes` - The amount of bytes to use for the timestamp.  Usually 
8.
   * `messagesUntilSizeChange` - The number of messages to keep at the same 
size.
   * `seed` - Used to initialize Random() to remove some non-determinism.
   
   Here is an example spec:
   
   ```
   {
      "type": "gaussianTimestampRandom",
      "messageSizeAverage": 512,
      "messageSizeDeviation": 100,
      "timestampBytes": 8,
      "messagesUntilSizeChange": 100
   }
   ```
   
   This will generate messages on a gaussian distribution with an average size 
each 512-bytes and the first 8 bytes encoded with the timestamp.  The message 
sizes will have a standard deviation of 100 bytes, and the size will only 
change every 100 messages.  The distribution of messages will be as follows:
   
   * The average size of the messages are 512 bytes.
   * ~68% of the messages are between 412 and 612 bytes
   * ~95% of the messages are between 312 and 712 bytes
   * ~99% of the messages are between 212 and 812 bytes
   
   ---
   
   # Testing
   
   ## New Functionality
   
   The ConfigurableProducer workload was tested by running various scenarios 
and verifying the metrics within the Kafka cluster matched the scenario as 
defined.
   
   ## Existing Functionality
   
   The ConsumeBench workload was tested without specifying the 
`recordProcessor` parameter to verify it still behaves as it did prior to this 
patch set.  All other code paths are in a new workload.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to