[ 
https://issues.apache.org/jira/browse/FLINK-35697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy resolved FLINK-35697.
---------------------------------
    Resolution: Resolved

> Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-35697
>                 URL: https://issues.apache.org/jira/browse/FLINK-35697
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Common
>    Affects Versions: 1.20.0
>            Reporter: Ahmed Hamdy
>            Assignee: Muhammet Orazov
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.20.0
>
>
> h2. Description
> In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with 
> default value of 10 minutes and default failOnTimeout to false. 
> We need to test the new feature on different levels
> - Functional Testing
> - Performance Testing
> - Regression Testing
> h2. Common Utils
> The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need 
> to use an implementation sink for our tests, Any implementation where we can 
> track delivery of elements is accepted in our tests, an example is:
> {code}
> class DiscardingElementWriter extends AsyncSinkWriter<T, String> {
>         SeparateThreadExecutor executor =
>                 new SeparateThreadExecutor(r -> new Thread(r, 
> "DiscardingElementWriter"));
>         public DiscardingElementWriter(
>                 Sink.InitContext context,
>                 AsyncSinkWriterConfiguration configuration,
>                 Collection<BufferedRequestState<String>> 
> bufferedRequestStates) {
>             super(
>                     (element, context1) -> element.toString(),
>                     context,
>                     configuration,
>                     bufferedRequestStates);
>         }
>         @Override
>         protected long getSizeInBytes(String requestEntry) {
>             return requestEntry.length();
>         }
>         @Override
>         protected void submitRequestEntries(
>                 List<String> requestEntries, ResultHandler<String> 
> resultHandler) {
>             executor.execute(
>                     () -> {
>                         long delayMillis = new Random().nextInt(5000);
>                         try {
>                             Thread.sleep(delayMillis);
>                         } catch (InterruptedException ignored) {
>                         }
>                         for (String entry : requestEntries) {
>                             LOG.info("Discarding {} after {} ms", entry, 
> delayMillis);
>                         }
>                         resultHandler.complete();
>                     });
>         }
>     }
> {code}
> We will also need a simple Flink Job that writes data using the sink
> {code}
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment
>                 .getExecutionEnvironment();
>         env.setParallelism(1);
>         env.fromSequence(0, 100)
>                 .map(Object::toString)
>                 .sinkTo(new DiscardingTestAsyncSink<>());
> {code}
> We can use least values for batch size and inflight requests to increase 
> number of requests that are subject to timeout
> {code}
> public class DiscardingTestAsyncSink<T> extends AsyncSinkBase<T, String> {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(DiscardingTestAsyncSink.class);
>     public DiscardingTestAsyncSink(long requestTimeoutMS, boolean 
> failOnTimeout) {
>         super(
>                 (element, context) -> element.toString(),
>                 1, // maxBatchSize
>                 1, // maxInflightRequests
>                 10, // maxBufferedRequests
>                 1000L, // maxBatchsize
>                 100, // MaxTimeInBuffer
>                 500L, // maxRecordSize
>                 requestTimeoutMS,
>                 failOnTimeout);
>     }
>     @Override
>     public SinkWriter<T> createWriter(WriterInitContext context) throws 
> IOException {
>         return new DiscardingElementWriter(
>                 new InitContextWrapper(context),
>                 AsyncSinkWriterConfiguration.builder()
>                         .setMaxBatchSize(this.getMaxBatchSize())
>                         .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
>                         .setMaxInFlightRequests(this.getMaxInFlightRequests())
>                         .setMaxBufferedRequests(this.getMaxBufferedRequests())
>                         .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
>                         
> .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
>                         .setFailOnTimeout(this.getFailOnTimeout())
>                         .setRequestTimeoutMS(this.getRequestTimeoutMS())
>                         .build(),
>                 Collections.emptyList());
>     }
>     @Override
>     public StatefulSinkWriter<T, BufferedRequestState<String>> restoreWriter(
>             WriterInitContext context, 
> Collection<BufferedRequestState<String>> recoveredState)
>             throws IOException {
>         return new DiscardingElementWriter(
>                 new InitContextWrapper(context),
>                 AsyncSinkWriterConfiguration.builder()
>                         .setMaxBatchSize(this.getMaxBatchSize())
>                         .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
>                         .setMaxInFlightRequests(this.getMaxInFlightRequests())
>                         .setMaxBufferedRequests(this.getMaxBufferedRequests())
>                         .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
>                         
> .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
>                         .setFailOnTimeout(this.getFailOnTimeout())
>                         .setRequestTimeoutMS(this.getRequestTimeoutMS())
>                         .build(),
>                 recoveredState);
>     }
> {code}
> h2. Functional tests
> These are common tests to verify the new feature works correctly withing 
> Flink jobs
> h3. Test Timeout Requests are retried ensuring at least once semantics
> h4. Steps
> - pull and compile {{release-1.20}} branch
> - start flink cluster from flink-dist {{./start-cluster.sh}}
> - Configure the requestTimeout value in your job to sample number of timed 
> out requests.
> - compile and package your test job
> - open Flink Dashboard, upload the job jar and submit the job
> - Verify from the logs that all elements are delivered and all elemements 
> delivered with delay more than the configured timeout are resubmitted.
> *hint 1* : It is advised to use timeout closed to the simulated delay in the 
> sink so that the retried requests are not too much to track
> *hint 2*: It is also advised to use the debugger to check timed out requests 
> on the fly.
> h3. Test Fail on timeout fails Job
> h4. Steps
> Follow same steps of setting up job and cluster but use enable the 
> {{failOnTimeout}} flag
> - Verify that the jobs fails with {{TimeoutException}} visible in Job logs
> h3. Test With Checkpoints enabled 
> h4. Steps
> Follow same steps of setting up job and cluster with checkpoints enabled
> - Verify checkpoints are taken successfully
> h2. Performance testing
> We want to verify the introduced feature doesn't affect performance of the 
> sink, specifically it doesn't introduce unnecessary backpressure
> h4. Steps
> Execute same steps for setting up job and cluster but use a datagen source in 
> the test job to control throughput and use default values for requestTimeOut 
> = 10 minutes
> - Run the job till it is in stable state
> - Verify the Sink doesn't introduce backpressure to the job.
> h2. Regression Testing
> We need to verify the Sink doesn't cause regression in existing implementers, 
> for example we want to make sure there is no significant load of duplicate 
> data due to timeouts on default values using an existing sink.
> We have the following implementers under the community support
> {{Kinesis, Firehose, DynamoDb, ElasticSearch}}
> It is advisable to test with all of them
> h4. Steps
> - Run a simple job that sinks data from Datagen source to the used sink
> - Benchmark the throughput to the Sink destination
> - clone the sink connector repo as in 
> https://github.com/apache/flink-connector-aws
> - update the Flink version in the repo to {{1.20-SNAPSHOT}} 
> - Rerun the job and compare the throughput metrics with the benchmark.
> - Verify there is no regression between the 2 cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to