Ahmed Hamdy created FLINK-35697:
-----------------------------------
Summary: Release Testing: Verify FLIP-451 Introduce timeout
configuration to AsyncSink
Key: FLINK-35697
URL: https://issues.apache.org/jira/browse/FLINK-35697
Project: Flink
Issue Type: Sub-task
Components: Connectors / Common
Reporter: Ahmed Hamdy
Fix For: 1.20.0
h2. Description
In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with default
value of 10 minutes and default failOnTimeout to false.
We need to test the new feature on different levels
- Functional Testing
- Performance Testing
- Regression Testing
h2. Common Utils
The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need
to use an implementation sink for our tests, Any implementation where we can
track delivery of elements is accepted in our tests, an example is:
{code}
class DiscardingElementWriter extends AsyncSinkWriter<T, String> {
SeparateThreadExecutor executor =
new SeparateThreadExecutor(r -> new Thread(r,
"DiscardingElementWriter"));
public DiscardingElementWriter(
Sink.InitContext context,
AsyncSinkWriterConfiguration configuration,
Collection<BufferedRequestState<String>> bufferedRequestStates)
{
super(
(element, context1) -> element.toString(),
context,
configuration,
bufferedRequestStates);
}
@Override
protected long getSizeInBytes(String requestEntry) {
return requestEntry.length();
}
@Override
protected void submitRequestEntries(
List<String> requestEntries, ResultHandler<String>
resultHandler) {
executor.execute(
() -> {
long delayMillis = new Random().nextInt(5000);
try {
Thread.sleep(delayMillis);
} catch (InterruptedException ignored) {
}
for (String entry : requestEntries) {
LOG.info("Discarding {} after {} ms", entry,
delayMillis);
}
resultHandler.complete();
});
}
}
{code}
We will also need a simple Flink Job that writes data using the sink
{code}
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(1);
env.fromSequence(0, 100)
.map(Object::toString)
.sinkTo(new DiscardingTestAsyncSink<>());
{code}
We can use least values for batch size and inflight requests to increase number
of requests that are subject to timeout
{code}
public class DiscardingTestAsyncSink<T> extends AsyncSinkBase<T, String> {
private static final Logger LOG =
LoggerFactory.getLogger(DiscardingTestAsyncSink.class);
public DiscardingTestAsyncSink(long requestTimeoutMS, boolean
failOnTimeout) {
super(
(element, context) -> element.toString(),
1, // maxBatchSize
1, // maxInflightRequests
10, // maxBufferedRequests
1000L, // maxBatchsize
100, // MaxTimeInBuffer
500L, // maxRecordSize
requestTimeoutMS,
failOnTimeout);
}
@Override
public SinkWriter<T> createWriter(WriterInitContext context) throws
IOException {
return new DiscardingElementWriter(
new InitContextWrapper(context),
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(this.getMaxBatchSize())
.setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
.setMaxInFlightRequests(this.getMaxInFlightRequests())
.setMaxBufferedRequests(this.getMaxBufferedRequests())
.setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
.setFailOnTimeout(this.getFailOnTimeout())
.setRequestTimeoutMS(this.getRequestTimeoutMS())
.build(),
Collections.emptyList());
}
@Override
public StatefulSinkWriter<T, BufferedRequestState<String>> restoreWriter(
WriterInitContext context, Collection<BufferedRequestState<String>>
recoveredState)
throws IOException {
return new DiscardingElementWriter(
new InitContextWrapper(context),
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(this.getMaxBatchSize())
.setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
.setMaxInFlightRequests(this.getMaxInFlightRequests())
.setMaxBufferedRequests(this.getMaxBufferedRequests())
.setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
.setFailOnTimeout(this.getFailOnTimeout())
.setRequestTimeoutMS(this.getRequestTimeoutMS())
.build(),
recoveredState);
}
{code}
h2. Functional tests
These are common tests to verify the new feature works correctly withing Flink
jobs
h3. Test Timeout Requests are retried ensuring at least once semantics
h4. Steps
- pull and compile {{release-1.20}} branch
- start flink cluster from flink-dist {{./start-cluster.sh}}
- Configure the requestTimeout value in your job to sample number of timed out
requests.
- compile and package your test job
- open Flink Dashboard, upload the job jar and submit the job
- Verify from the logs that all elements are delivered and all elemements
delivered with delay more than the configured timeout are resubmitted.
*hint 1* : It is advised to use timeout closed to the simulated delay in the
sink so that the retried requests are not too much to track
*hint 2*: It is also advised to use the debugger to check timed out requests on
the fly.
h3. Test Fail on timeout fails Job
h4. Steps
Follow same steps of setting up job and cluster but use enable the
{{failOnTimeout}} flag
- Verify that the jobs fails with {{TimeoutException}} visible in Job logs
h3. Test With Checkpoints enabled
h4. Steps
Follow same steps of setting up job and cluster with checkpoints enabled
- Verify checkpoints are taken successfully
h2. Performance testing
We want to verify the introduced feature doesn't affect performance of the
sink, specifically it doesn't introduce unnecessary backpressure
h4. Steps
Execute same steps for setting up job and cluster but use a datagen source in
the test job to control throughput and use default values for requestTimeOut =
10 minutes
- Run the job till it is in stable state
- Verify the Sink doesn't introduce backpressure to the job.
h2. Regression Testing
We need to verify the Sink doesn't cause regression in existing implementers,
for example we want to make sure there is no significant load of duplicate data
due to timeouts on default values using an existing sink.
We have the following implementers under the community support
{{Kinesis, Firehose, DynamoDb, ElasticSearch}}
It is advisable to test with all of them
h4. Steps
- Run a simple job that sinks data from Datagen source to the used sink
- Benchmark the throughput to the Sink destination
- clone the sink connector repo as in
https://github.com/apache/flink-connector-aws
- update the Flink version in the repo to {{1.20-SNAPSHOT}}
- Rerun the job and compare the throughput metrics with the benchmark.
- Verify there is no regression between the 2 cases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)