scott-hendricks opened a new pull request #9736: URL: https://github.com/apache/kafka/pull/9736
This PR pulls together a couple of the outstanding one-pagers we have for Q4: https://confluentinc.atlassian.net/wiki/spaces/QERM/pages/1605992772 https://confluentinc.atlassian.net/wiki/spaces/QERM/pages/1589186862 This allows us to run highly granular and configurable workloads to directly simulate customer scenarios and measure the end to end latency all within Trogdor. This creates a new `ConfigurableProducer` workload that can be used to tune many parts of the workload better than the `ProduceBench` workload. This also creates all the helper classes for the `ConfigurableProducer` workload. This adds a new parameter to the `ConsumeBench` workload to allow for processing of records after polling them. This also adds a new E2E latency test utilizing the new record processor within the `ConsumeBench` workload. --- # ConfigurableProducer workload The ConfigurableProducer workload allows for customized and even variable configurations in terms of messages per second, message size, batch size, key size, and even the ability to target a specific partition out of a topic. The parameters that differ from ProduceBenchSpec: * `flushGenerator` - Used to instruct the KafkaProducer when to issue flushes. This allows us to simulate variable batching since batch flushing is not currently exposed within the KafkaProducer class. * `throughputGenerator` - Used to throttle the ConfigurableProducerWorker based on a calculated number of messages within a window. * `activeTopic` - This class only supports execution against a single topic at a time. If more than one topic is specified, the ConfigurableProducerWorker will throw an error. * `activePartition` - Specify a specific partition number within the activeTopic to run load against, or specify `-1` to allow use of all partitions. Here is an example spec: ``` { "startMs": 1606949497662, "durationMs": 3600000, "producerNode": "trogdor-agent-0", "bootstrapServers": "some.example.kafka.server:9091", "flushGenerator": { "type": "gaussian", "messagesPerFlushAverage": 16, "messagesPerFlushDeviation": 4 }, "throughputGenerator": { "type": "gaussian", "messagesPerSecondAverage": 500, "messagesPerSecondDeviation": 50, "windowsUntilRateChange": 100, "windowSizeMs": 100 }, "keyGenerator": { "type": "constant", "size": 8 }, "valueGenerator": { "type": "gaussianTimestampRandom", "messageSizeAverage": 512, "messageSizeDeviation": 100, "timestampBytes": 8, "messagesUntilSizeChange": 100 }, "producerConf": { "acks": "all" }, "commonClientConf": {}, "adminClientConf": {}, "activeTopic": { "topic0": { "numPartitions": 100, "replicationFactor": 3, "configs": { "retention.ms": "1800000" } } }, "activePartition": 5 } ``` This example spec performed the following: * Ran on `trogdor-agent-0` for 1 hour starting at 2020-12-02 22:51:37.662 GMT * Produced with acks=all to Partition 5 of `topic0` on kafka server `some.example.kafka.server:9091`. * The average batch had 16 messages, with a standard deviation of 4 messages. * The message had a 8-bit constant key, with an average value of 512 bytes and a standard deviation of 100 bytes. * The messages had millisecond timestamps embedded in the first 8-bytes of the value. * The average throughput was 500 messages/second, with a window of 100ms and a deviation of 50 messages/second. --- # ConsumeBench workload changes This commit adds the ability for the ConsumeBench workloads to optionally process the records returned through the consumer poll call. This is done by specifying a new `recordProcessor` parameter. The record processor's status is then included in the workload's status. ## RecordProcessor This interface provides the ability to optionally process records after the ConsumeBench workload polls them. The interface provides for the ability to include additional data in the status output. Currently there are 2 processing methods: * Disabled, by not specifying this parameter. * `timestamp` will use `TimestampRecordProcessor` to process records containing a timestamp in the first several bytes of the message. ### TimestampRecordProcessor This includes a `TimestampRecordProcessor` class to process records containing a timestamp in the first several bytes of the message. This class will process records containing timestamps and generate a histogram based on the data. It will then be present in the status from the `ConsumeBenchWorker` class. This must be used with a timestamped PayloadGenerator implementation. Here's an example spec: ``` { "type": "timestampRandom", "timestampBytes": 8, "histogramMaxMs": 10000, "histogramMinMs": 0, "histogramStepMs": 1 } ``` This will track total E2E latency up to 10 seconds, using 1ms resolution and a timestamp size of 8 bytes. --- # FlushGenerator A FlushGenerator is used to facilitate flushing the KafkaProducers on a cadence specified by the user. This is useful to simulate a specific number of messages in a batch regardless of the message size, since batch flushing is not exposed in the KafkaProducer. Currently there are 3 flushing methods: * Disabled, by not specifying this parameter. * `constant` will use `ConstantFlushGenerator` to keep the number of messages per batch constant. * `gaussian` will use `GaussianFlushGenerator` to vary the number of messages per batch on a normal distribution. ## ConstantFlushGenerator This generator will flush the producer after a specific number of messages. This does not directly control when KafkaProducer will batch, this only makes best effort. This also cannot tell when a KafkaProducer batch is closed. If the KafkaProducer sends a batch before this executes, this will continue to execute on its own cadence. Here is an example spec: ``` { "type": "constant", "messagesPerFlush": 16 } ``` This example will flush the producer every 16 messages. ## GaussianFlushGenerator This generator will flush the producer after a specific number of messages, determined by a gaussian distribution. This does not directly control when KafkaProducer will batch, this only makes best effort. This also cannot tell when a KafkaProducer batch is closed. If the KafkaProducer sends a batch before this executes, this will continue to execute on its own cadence. Here is an example spec: ``` { "type": "gaussian", "messagesPerFlushAverage": 16, "messagesPerFlushDeviation": 4 } ``` This example will flush the producer on average every 16 messages, assuming `linger.ms` and `batch.size` allow for it. That average changes based on a normal distribution after each flush: * An average of the flushes will be at 16 messages. * ~68% of the flushes are at between 12 and 20 messages. * ~95% of the flushes are at between 8 and 24 messages. * ~99% of the flushes are at between 4 and 28 messages. --- # ThroughputGenerator Similar to the throttle class, except a simpler design. This interface is used to facilitate running a configurable number of messages per second by throttling if the throughput goes above a certain amount. Currently there are 2 throughput methods: * `constant` will use `ConstantThroughputGenerator` to keep the number of messages per second constant. * `gaussian` will use `GaussianThroughputGenerator` to vary the number of messages per second on a normal distribution. ## ConstantThroughputGenerator This throughput generator configures constant throughput. The lower the window size, the smoother the traffic will be. Using a 100ms window offers no noticeable spikes in traffic while still being long enough to avoid too much overhead. Due to binary nature of throughput in terms of messages sent in a window, each window will send at least 1 message, and each window sends the same number of messages, rounded down. For example, 99 messages per second with a 100ms window will only send 90 messages per second, or 9 messages per window. Another example, in order to send only 5 messages per second, a window size of 200ms is required. In cases like these, both the `messagesPerSecond` and `windowSizeMs` parameters should be adjusted together to achieve more accurate throughput. Here is an example spec: ``` { "type": "constant", "messagesPerSecond": 500, "windowSizeMs": 100 } ``` This will produce a workload that runs 500 messages per second, with a maximum resolution of 50 messages per 100 millisecond. ## GaussianThroughputGenerator This throughput generator configures throughput with a gaussian normal distribution on a per-window basis. You can specify how many windows to keep the throughput at the rate before changing. All traffic will follow a gaussian distribution centered around `messagesPerSecondAverage` with a deviation of `messagesPerSecondDeviation`. The lower the window size, the smoother the traffic will be. Using a 100ms window offers no noticeable spikes in traffic while still being long enough to avoid too much overhead. Due to binary nature of throughput in terms of messages sent in a window, this does not work well for an average throughput of less than 5 messages per window. In cases where you want lower throughput, the `windowSizeMs` must be adjusted accordingly. Here is an example spec: ``` { "type": "gaussian", "messagesPerSecondAverage": 500, "messagesPerSecondDeviation": 50, "windowsUntilRateChange": 100, "windowSizeMs": 100 } ``` This will produce a workload that runs on average 500 messages per second, however that speed will change every 10 seconds due to the `windowSizeMs * windowsUntilRateChange` parameters. The throughput will have the following normal distribution: * An average of the throughput windows of 500 messages per second. * ~68% of the throughput windows are between 450 and 550 messages per second. * ~95% of the throughput windows are between 400 and 600 messages per second. * ~99% of the throughput windows are between 350 and 650 messages per second. --- # Additional Payload Generators This implementation also offers additional payload generators to facilitate the tests these workloads are designed to run. These are also compatible with the existing ProduceBench workloads. ## TimestampRandomPayloadGenerator This generator generates timestamped pseudo-random payloads that can be reproduced from run to run. The guarantees are the same as those of java.util.Random. The timestamp used for this class is in milliseconds since epoch, encoded directly to the first several bytes of the payload. This should be used in conjunction with TimestampRecordProcessor in the Consumer to measure true end-to-end latency of a system. * `size` - The size in bytes of each message. * `timestampBytes` - The amount of bytes to use for the timestamp. Usually 8. * `seed` - Used to initialize Random() to remove some non-determinism. Here is an example spec: ``` { "type": "timestampRandom", "size": 512, "timestampBytes": 8 } ``` This will generate a 512-byte random message with the first 8 bytes encoded with the timestamp. ## GaussianTimestampRandomPayloadGenerator This class behaves identically to TimestampRandomPayloadGenerator, except the message size follows a gaussian distribution. This should be used in conjunction with TimestampRecordProcessor in the Consumer to measure true end-to-end latency of a system. * `messageSizeAverage` - The average size in bytes of each message. * `messageSizeDeviation` - The standard deviation to use when calculating message size. * `timestampBytes` - The amount of bytes to use for the timestamp. Usually 8. * `messagesUntilSizeChange` - The number of messages to keep at the same size. * `seed` - Used to initialize Random() to remove some non-determinism. Here is an example spec: ``` { "type": "gaussianTimestampRandom", "messageSizeAverage": 512, "messageSizeDeviation": 100, "timestampBytes": 8, "messagesUntilSizeChange": 100 } ``` This will generate messages on a gaussian distribution with an average size each 512-bytes and the first 8 bytes encoded with the timestamp. The message sizes will have a standard deviation of 100 bytes, and the size will only change every 100 messages. The distribution of messages will be as follows: * The average size of the messages are 512 bytes. * ~68% of the messages are between 412 and 612 bytes * ~95% of the messages are between 312 and 712 bytes * ~99% of the messages are between 212 and 812 bytes --- # Testing ## New Functionality The ConfigurableProducer workload was tested by running various scenarios and verifying the metrics within the Kafka cluster matched the scenario as defined. ## Existing Functionality The ConsumeBench workload was tested without specifying the `recordProcessor` parameter to verify it still behaves as it did prior to this patch set. All other code paths are in a new workload. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org