infoverload commented on code in PR #517:
URL: https://github.com/apache/flink-web/pull/517#discussion_r861495533


##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,160 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-05 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building 
your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. All sinks batch records according 
to user defined buffering hints, sign requests, write them to the destination, 
retry unsuccessful or throttled requests, and participate in checkpointing.
+
+This is why for [Flink 
1.15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink) 
we have decided to create the `AsyncSinkBase`, an abstract sink with a number 
of common functionalities extracted. Adding support for a new destination now 
only requires a lightweight shim that implements the specific interfaces of the 
destination using a client that supports async requests. 
+
+This common abstraction will reduce the effort required to maintain individual 
sinks that extend from this abstract sink, with bugfixes and improvements to 
the sink core benefiting all implementations that extend it. The design of 
AsyncSyncBase focuses on extensibility and a broad support of destinations. The 
core of the sink is kept generic and free of any connector-specific 
dependencies.
+
+The sink base is designed to participate in checkpointing to provide 
at-least-once semantics and can work directly with destinations that provide a 
client that supports asynchronous requests. Alternatively, concrete sink 
implementers may manage their own thread pool with a synchronous client.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+In order to use the base sink, you will need to add the following dependency 
to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional 
metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an 
element in the DataStream to the payload type that contains all the additional 
metadata required to submit that element to the destination by the sink. 
Ideally, this would be hidden from the end user since it allows concrete sink 
implementers to adapt to changes in the destination API without breaking end 
user code.
+
+## Sink Writer Interface
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, 
BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> 
requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit 
`requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the 
buffer for retry using `requestResult.accept(...list of failed entries...)`. 
However, retrying any element that is known to be faulty and consistently 
failing, will result in that element being requeued forever, therefore a 
sensible strategy for determining what should be retried is highly recommended.
+
+If at any point, it is determined that a fatal error has occurred and that we 
should throw a runtime exception from the sink, we can call 
`getFatalExceptionCons().accept(...);` from anywhere in the concrete sink 
writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, 
BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
+    // ...
+}
+```
+The async sink has a concept of size of elements in the buffer. This allows 
users to specify a byte size threshold beyond which elements will be flushed. 
However the sink implementer is best positioned to determine what is the most 
sensible measure of size for each `RequestEntryT` is.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, 
BufferedRequestState<RequestEntryT>> {
+    // ...
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long flushOnBufferSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes) { /* ... */ }
+    // ...
+}
+```
+
+By default, the method `snapshotState` returns all the elements in the buffer 
to be saved down for snapshots. Any elements that have been removed from the 
buffer for writing and are in flight have already been completed due to the 
sink operator calling `flush(true)` on the sink writer.
+You may want to save down additional state from the concrete sink and you can 
achieve this by overriding `snapshotState`, and restoring in the constructor.
+
+```java
+class MySinkWriter<InputT> extends AsyncSinkWriter<InputT, RequestEntryT> {
+
+    MySinkWriter(
+          // ... 
+          Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+            // ...
+            initialStates);
+        // restore concrete sink state from initialStates
+    }
+    
+    @Override
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long 
checkpointId) {
+        super.snapshotState(checkpointId);
+        // ...
+    }
+
+}
+```
+
+## Sink Interface
+
+[AsyncSinkBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java)
+
+```java
+class MySink<InputT> extends AsyncSinkBase<InputT, RequestEntryT> {
+    // ...
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> 
createWriter(InitContext context) {
+        return new MySinkWriter(context);
+    }
+    // ...
+}
+```
+Sink implementers extending this will need to return their own extension of 
the `AsyncSinkWriter` from `createWriter()` inside their own implementation of 
`AsyncSinkBase`.
+
+At the time of writing, the [Kinesis Data Streams 
sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-streams)
 and [Kinesis Data Firehose 
sink](https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-aws-kinesis-firehose)
 are using this base sink. 
+
+# Metrics
+
+There are three metrics that automatically exist when you implement sinks 
(and, thus, should not be implemented by yourself).
+
+* CurrentSendTime Gauge - returns the amount of time in milliseconds it took 
for the most recent request to write records to return, whether successful or 
not.  
+* NumBytesOut Counter - counts the total number of bytes the sink has tried to 
write to the destination, using the method `getSizeInBytes` to determine the 
size of each record. This will double count failures that may need to be 
retried. 
+* NumRecordsOut Counter - similar to above, this counts the total number of 
records the sink has tried to write to the destination. This will double count 
failures that may need to be retried.
+
+# Sink Behaviour

Review Comment:
   ```suggestion
   # Sink Behavior
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to