Ahmed Hamdy created FLINK-35697: ----------------------------------- Summary: Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink Key: FLINK-35697 URL: https://issues.apache.org/jira/browse/FLINK-35697 Project: Flink Issue Type: Sub-task Components: Connectors / Common Reporter: Ahmed Hamdy Fix For: 1.20.0
h2. Description In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with default value of 10 minutes and default failOnTimeout to false. We need to test the new feature on different levels - Functional Testing - Performance Testing - Regression Testing h2. Common Utils The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need to use an implementation sink for our tests, Any implementation where we can track delivery of elements is accepted in our tests, an example is: {code} class DiscardingElementWriter extends AsyncSinkWriter<T, String> { SeparateThreadExecutor executor = new SeparateThreadExecutor(r -> new Thread(r, "DiscardingElementWriter")); public DiscardingElementWriter( Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection<BufferedRequestState<String>> bufferedRequestStates) { super( (element, context1) -> element.toString(), context, configuration, bufferedRequestStates); } @Override protected long getSizeInBytes(String requestEntry) { return requestEntry.length(); } @Override protected void submitRequestEntries( List<String> requestEntries, ResultHandler<String> resultHandler) { executor.execute( () -> { long delayMillis = new Random().nextInt(5000); try { Thread.sleep(delayMillis); } catch (InterruptedException ignored) { } for (String entry : requestEntries) { LOG.info("Discarding {} after {} ms", entry, delayMillis); } resultHandler.complete(); }); } } {code} We will also need a simple Flink Job that writes data using the sink {code} final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); env.setParallelism(1); env.fromSequence(0, 100) .map(Object::toString) .sinkTo(new DiscardingTestAsyncSink<>()); {code} We can use least values for batch size and inflight requests to increase number of requests that are subject to timeout {code} public class DiscardingTestAsyncSink<T> extends AsyncSinkBase<T, String> { private static final Logger LOG = LoggerFactory.getLogger(DiscardingTestAsyncSink.class); public DiscardingTestAsyncSink(long requestTimeoutMS, boolean failOnTimeout) { super( (element, context) -> element.toString(), 1, // maxBatchSize 1, // maxInflightRequests 10, // maxBufferedRequests 1000L, // maxBatchsize 100, // MaxTimeInBuffer 500L, // maxRecordSize requestTimeoutMS, failOnTimeout); } @Override public SinkWriter<T> createWriter(WriterInitContext context) throws IOException { return new DiscardingElementWriter( new InitContextWrapper(context), AsyncSinkWriterConfiguration.builder() .setMaxBatchSize(this.getMaxBatchSize()) .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes()) .setMaxInFlightRequests(this.getMaxInFlightRequests()) .setMaxBufferedRequests(this.getMaxBufferedRequests()) .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS()) .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes()) .setFailOnTimeout(this.getFailOnTimeout()) .setRequestTimeoutMS(this.getRequestTimeoutMS()) .build(), Collections.emptyList()); } @Override public StatefulSinkWriter<T, BufferedRequestState<String>> restoreWriter( WriterInitContext context, Collection<BufferedRequestState<String>> recoveredState) throws IOException { return new DiscardingElementWriter( new InitContextWrapper(context), AsyncSinkWriterConfiguration.builder() .setMaxBatchSize(this.getMaxBatchSize()) .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes()) .setMaxInFlightRequests(this.getMaxInFlightRequests()) .setMaxBufferedRequests(this.getMaxBufferedRequests()) .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS()) .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes()) .setFailOnTimeout(this.getFailOnTimeout()) .setRequestTimeoutMS(this.getRequestTimeoutMS()) .build(), recoveredState); } {code} h2. Functional tests These are common tests to verify the new feature works correctly withing Flink jobs h3. Test Timeout Requests are retried ensuring at least once semantics h4. Steps - pull and compile {{release-1.20}} branch - start flink cluster from flink-dist {{./start-cluster.sh}} - Configure the requestTimeout value in your job to sample number of timed out requests. - compile and package your test job - open Flink Dashboard, upload the job jar and submit the job - Verify from the logs that all elements are delivered and all elemements delivered with delay more than the configured timeout are resubmitted. *hint 1* : It is advised to use timeout closed to the simulated delay in the sink so that the retried requests are not too much to track *hint 2*: It is also advised to use the debugger to check timed out requests on the fly. h3. Test Fail on timeout fails Job h4. Steps Follow same steps of setting up job and cluster but use enable the {{failOnTimeout}} flag - Verify that the jobs fails with {{TimeoutException}} visible in Job logs h3. Test With Checkpoints enabled h4. Steps Follow same steps of setting up job and cluster with checkpoints enabled - Verify checkpoints are taken successfully h2. Performance testing We want to verify the introduced feature doesn't affect performance of the sink, specifically it doesn't introduce unnecessary backpressure h4. Steps Execute same steps for setting up job and cluster but use a datagen source in the test job to control throughput and use default values for requestTimeOut = 10 minutes - Run the job till it is in stable state - Verify the Sink doesn't introduce backpressure to the job. h2. Regression Testing We need to verify the Sink doesn't cause regression in existing implementers, for example we want to make sure there is no significant load of duplicate data due to timeouts on default values using an existing sink. We have the following implementers under the community support {{Kinesis, Firehose, DynamoDb, ElasticSearch}} It is advisable to test with all of them h4. Steps - Run a simple job that sinks data from Datagen source to the used sink - Benchmark the throughput to the Sink destination - clone the sink connector repo as in https://github.com/apache/flink-connector-aws - update the Flink version in the repo to {{1.20-SNAPSHOT}} - Rerun the job and compare the throughput metrics with the benchmark. - Verify there is no regression between the 2 cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)