Ahmed Hamdy created FLINK-35697:
-----------------------------------

             Summary: Release Testing: Verify FLIP-451 Introduce timeout 
configuration to AsyncSink
                 Key: FLINK-35697
                 URL: https://issues.apache.org/jira/browse/FLINK-35697
             Project: Flink
          Issue Type: Sub-task
          Components: Connectors / Common
            Reporter: Ahmed Hamdy
             Fix For: 1.20.0


h2. Description

In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with default 
value of 10 minutes and default failOnTimeout to false. 
We need to test the new feature on different levels
- Functional Testing
- Performance Testing
- Regression Testing

h2. Common Utils

The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need 
to use an implementation sink for our tests, Any implementation where we can 
track delivery of elements is accepted in our tests, an example is:
{code}
class DiscardingElementWriter extends AsyncSinkWriter<T, String> {
        SeparateThreadExecutor executor =
                new SeparateThreadExecutor(r -> new Thread(r, 
"DiscardingElementWriter"));

        public DiscardingElementWriter(
                Sink.InitContext context,
                AsyncSinkWriterConfiguration configuration,
                Collection<BufferedRequestState<String>> bufferedRequestStates) 
{
            super(
                    (element, context1) -> element.toString(),
                    context,
                    configuration,
                    bufferedRequestStates);
        }

        @Override
        protected long getSizeInBytes(String requestEntry) {
            return requestEntry.length();
        }

        @Override
        protected void submitRequestEntries(
                List<String> requestEntries, ResultHandler<String> 
resultHandler) {
            executor.execute(
                    () -> {
                        long delayMillis = new Random().nextInt(5000);
                        try {
                            Thread.sleep(delayMillis);
                        } catch (InterruptedException ignored) {
                        }
                        for (String entry : requestEntries) {
                            LOG.info("Discarding {} after {} ms", entry, 
delayMillis);
                        }

                        resultHandler.complete();
                    });
        }
    }
{code}

We will also need a simple Flink Job that writes data using the sink

{code}
    final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(1);
        env.fromSequence(0, 100)
                .map(Object::toString)
                .sinkTo(new DiscardingTestAsyncSink<>());
{code}

We can use least values for batch size and inflight requests to increase number 
of requests that are subject to timeout

{code}
public class DiscardingTestAsyncSink<T> extends AsyncSinkBase<T, String> {
    private static final Logger LOG = 
LoggerFactory.getLogger(DiscardingTestAsyncSink.class);

    public DiscardingTestAsyncSink(long requestTimeoutMS, boolean 
failOnTimeout) {
        super(
                (element, context) -> element.toString(),
                1, // maxBatchSize
                1, // maxInflightRequests
                10, // maxBufferedRequests
                1000L, // maxBatchsize
                100, // MaxTimeInBuffer
                500L, // maxRecordSize
                requestTimeoutMS,
                failOnTimeout);
    }

    @Override
    public SinkWriter<T> createWriter(WriterInitContext context) throws 
IOException {
        return new DiscardingElementWriter(
                new InitContextWrapper(context),
                AsyncSinkWriterConfiguration.builder()
                        .setMaxBatchSize(this.getMaxBatchSize())
                        .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
                        .setMaxInFlightRequests(this.getMaxInFlightRequests())
                        .setMaxBufferedRequests(this.getMaxBufferedRequests())
                        .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
                        .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
                        .setFailOnTimeout(this.getFailOnTimeout())
                        .setRequestTimeoutMS(this.getRequestTimeoutMS())
                        .build(),
                Collections.emptyList());
    }

    @Override
    public StatefulSinkWriter<T, BufferedRequestState<String>> restoreWriter(
            WriterInitContext context, Collection<BufferedRequestState<String>> 
recoveredState)
            throws IOException {
        return new DiscardingElementWriter(
                new InitContextWrapper(context),
                AsyncSinkWriterConfiguration.builder()
                        .setMaxBatchSize(this.getMaxBatchSize())
                        .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
                        .setMaxInFlightRequests(this.getMaxInFlightRequests())
                        .setMaxBufferedRequests(this.getMaxBufferedRequests())
                        .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
                        .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
                        .setFailOnTimeout(this.getFailOnTimeout())
                        .setRequestTimeoutMS(this.getRequestTimeoutMS())
                        .build(),
                recoveredState);
    }
{code}

h2. Functional tests
These are common tests to verify the new feature works correctly withing Flink 
jobs

h3. Test Timeout Requests are retried ensuring at least once semantics
h4. Steps
- pull and compile {{release-1.20}} branch
- start flink cluster from flink-dist {{./start-cluster.sh}}
- Configure the requestTimeout value in your job to sample number of timed out 
requests.
- compile and package your test job
- open Flink Dashboard, upload the job jar and submit the job
- Verify from the logs that all elements are delivered and all elemements 
delivered with delay more than the configured timeout are resubmitted.

*hint 1* : It is advised to use timeout closed to the simulated delay in the 
sink so that the retried requests are not too much to track
*hint 2*: It is also advised to use the debugger to check timed out requests on 
the fly.

h3. Test Fail on timeout fails Job
h4. Steps
Follow same steps of setting up job and cluster but use enable the 
{{failOnTimeout}} flag
- Verify that the jobs fails with {{TimeoutException}} visible in Job logs

h3. Test With Checkpoints enabled 
h4. Steps
Follow same steps of setting up job and cluster with checkpoints enabled
- Verify checkpoints are taken successfully

h2. Performance testing

We want to verify the introduced feature doesn't affect performance of the 
sink, specifically it doesn't introduce unnecessary backpressure

h4. Steps
Execute same steps for setting up job and cluster but use a datagen source in 
the test job to control throughput and use default values for requestTimeOut = 
10 minutes
- Run the job till it is in stable state
- Verify the Sink doesn't introduce backpressure to the job.

h2. Regression Testing

We need to verify the Sink doesn't cause regression in existing implementers, 
for example we want to make sure there is no significant load of duplicate data 
due to timeouts on default values using an existing sink.

We have the following implementers under the community support
{{Kinesis, Firehose, DynamoDb, ElasticSearch}}

It is advisable to test with all of them
h4. Steps
- Run a simple job that sinks data from Datagen source to the used sink
- Benchmark the throughput to the Sink destination
- clone the sink connector repo as in 
https://github.com/apache/flink-connector-aws
- update the Flink version in the repo to {{1.20-SNAPSHOT}} 
- Rerun the job and compare the throughput metrics with the benchmark.
- Verify there is no regression between the 2 cases.












--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to