[ https://issues.apache.org/jira/browse/FLINK-35697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ahmed Hamdy resolved FLINK-35697. --------------------------------- Resolution: Resolved > Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink > ----------------------------------------------------------------------------- > > Key: FLINK-35697 > URL: https://issues.apache.org/jira/browse/FLINK-35697 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common > Affects Versions: 1.20.0 > Reporter: Ahmed Hamdy > Assignee: Muhammet Orazov > Priority: Blocker > Labels: release-testing > Fix For: 1.20.0 > > > h2. Description > In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with > default value of 10 minutes and default failOnTimeout to false. > We need to test the new feature on different levels > - Functional Testing > - Performance Testing > - Regression Testing > h2. Common Utils > The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need > to use an implementation sink for our tests, Any implementation where we can > track delivery of elements is accepted in our tests, an example is: > {code} > class DiscardingElementWriter extends AsyncSinkWriter<T, String> { > SeparateThreadExecutor executor = > new SeparateThreadExecutor(r -> new Thread(r, > "DiscardingElementWriter")); > public DiscardingElementWriter( > Sink.InitContext context, > AsyncSinkWriterConfiguration configuration, > Collection<BufferedRequestState<String>> > bufferedRequestStates) { > super( > (element, context1) -> element.toString(), > context, > configuration, > bufferedRequestStates); > } > @Override > protected long getSizeInBytes(String requestEntry) { > return requestEntry.length(); > } > @Override > protected void submitRequestEntries( > List<String> requestEntries, ResultHandler<String> > resultHandler) { > executor.execute( > () -> { > long delayMillis = new Random().nextInt(5000); > try { > Thread.sleep(delayMillis); > } catch (InterruptedException ignored) { > } > for (String entry : requestEntries) { > LOG.info("Discarding {} after {} ms", entry, > delayMillis); > } > resultHandler.complete(); > }); > } > } > {code} > We will also need a simple Flink Job that writes data using the sink > {code} > final StreamExecutionEnvironment env = StreamExecutionEnvironment > .getExecutionEnvironment(); > env.setParallelism(1); > env.fromSequence(0, 100) > .map(Object::toString) > .sinkTo(new DiscardingTestAsyncSink<>()); > {code} > We can use least values for batch size and inflight requests to increase > number of requests that are subject to timeout > {code} > public class DiscardingTestAsyncSink<T> extends AsyncSinkBase<T, String> { > private static final Logger LOG = > LoggerFactory.getLogger(DiscardingTestAsyncSink.class); > public DiscardingTestAsyncSink(long requestTimeoutMS, boolean > failOnTimeout) { > super( > (element, context) -> element.toString(), > 1, // maxBatchSize > 1, // maxInflightRequests > 10, // maxBufferedRequests > 1000L, // maxBatchsize > 100, // MaxTimeInBuffer > 500L, // maxRecordSize > requestTimeoutMS, > failOnTimeout); > } > @Override > public SinkWriter<T> createWriter(WriterInitContext context) throws > IOException { > return new DiscardingElementWriter( > new InitContextWrapper(context), > AsyncSinkWriterConfiguration.builder() > .setMaxBatchSize(this.getMaxBatchSize()) > .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes()) > .setMaxInFlightRequests(this.getMaxInFlightRequests()) > .setMaxBufferedRequests(this.getMaxBufferedRequests()) > .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS()) > > .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes()) > .setFailOnTimeout(this.getFailOnTimeout()) > .setRequestTimeoutMS(this.getRequestTimeoutMS()) > .build(), > Collections.emptyList()); > } > @Override > public StatefulSinkWriter<T, BufferedRequestState<String>> restoreWriter( > WriterInitContext context, > Collection<BufferedRequestState<String>> recoveredState) > throws IOException { > return new DiscardingElementWriter( > new InitContextWrapper(context), > AsyncSinkWriterConfiguration.builder() > .setMaxBatchSize(this.getMaxBatchSize()) > .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes()) > .setMaxInFlightRequests(this.getMaxInFlightRequests()) > .setMaxBufferedRequests(this.getMaxBufferedRequests()) > .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS()) > > .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes()) > .setFailOnTimeout(this.getFailOnTimeout()) > .setRequestTimeoutMS(this.getRequestTimeoutMS()) > .build(), > recoveredState); > } > {code} > h2. Functional tests > These are common tests to verify the new feature works correctly withing > Flink jobs > h3. Test Timeout Requests are retried ensuring at least once semantics > h4. Steps > - pull and compile {{release-1.20}} branch > - start flink cluster from flink-dist {{./start-cluster.sh}} > - Configure the requestTimeout value in your job to sample number of timed > out requests. > - compile and package your test job > - open Flink Dashboard, upload the job jar and submit the job > - Verify from the logs that all elements are delivered and all elemements > delivered with delay more than the configured timeout are resubmitted. > *hint 1* : It is advised to use timeout closed to the simulated delay in the > sink so that the retried requests are not too much to track > *hint 2*: It is also advised to use the debugger to check timed out requests > on the fly. > h3. Test Fail on timeout fails Job > h4. Steps > Follow same steps of setting up job and cluster but use enable the > {{failOnTimeout}} flag > - Verify that the jobs fails with {{TimeoutException}} visible in Job logs > h3. Test With Checkpoints enabled > h4. Steps > Follow same steps of setting up job and cluster with checkpoints enabled > - Verify checkpoints are taken successfully > h2. Performance testing > We want to verify the introduced feature doesn't affect performance of the > sink, specifically it doesn't introduce unnecessary backpressure > h4. Steps > Execute same steps for setting up job and cluster but use a datagen source in > the test job to control throughput and use default values for requestTimeOut > = 10 minutes > - Run the job till it is in stable state > - Verify the Sink doesn't introduce backpressure to the job. > h2. Regression Testing > We need to verify the Sink doesn't cause regression in existing implementers, > for example we want to make sure there is no significant load of duplicate > data due to timeouts on default values using an existing sink. > We have the following implementers under the community support > {{Kinesis, Firehose, DynamoDb, ElasticSearch}} > It is advisable to test with all of them > h4. Steps > - Run a simple job that sinks data from Datagen source to the used sink > - Benchmark the throughput to the Sink destination > - clone the sink connector repo as in > https://github.com/apache/flink-connector-aws > - update the Flink version in the repo to {{1.20-SNAPSHOT}} > - Rerun the job and compare the throughput metrics with the benchmark. > - Verify there is no regression between the 2 cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)