Yanikovic commented on a change in pull request #15140:
URL: https://github.com/apache/flink/pull/15140#discussion_r594708644



##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import 
org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterAtLeastOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterAtMostOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * RabbitMQ sink (publisher) that publishes messages from upstream flink tasks 
to a RabbitMQ queue.
+ * It provides at-most-once, at-least-once and exactly-once processing 
semantics. For at-least-once
+ * and exactly-once, checkpointing needs to be enabled. The sink operates as a 
StreamingSink and

Review comment:
       Done

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md
##########
@@ -0,0 +1,63 @@
+# RabbitMQ Sink
+
+Flink's RabbitMQ connector provides a sink which enables you to publish your 
stream directly
+to a RabbitMQ exchange in three different consistency modes: at-most-once, 
at-least-once,
+and exactly-once. Furthermore, user defined publish options can be used to 
customize each message
+options in regard to exchange and publish settings in the RabbitMQ context.
+
+## Consistency Behaviour
+With __at-most-once__, the sink will simply take each message and publish the 
serialization of it
+(with publish options if given) to RabbitMQ. At this point the sink gives up 
the ownership of the message.
+
+For __at-least-once__ the same process as for at-most-once is executed except 
that the ownership of
+the message does not end immediately with publishing it. The sink will keep 
the individual publishing
+id for each message as well as the message itself and buffer it as long as it 
takes to receive the
+message received acknowledgment from RabbitMQ. Since we are in the need of 
buffering messages and waiting
+for their asynchronous acknowledgment, this requires _checkpointing enabled_. 
On each checkpoint,
+all to this point buffered (and thus send) but unacknowledged messages will be 
stored. Simultaneously,
+on each checkpoint a resend will be triggered to send all unacknowledged 
messages once again since
+we have to assume that something went wrong for it during the publishing 
process. Since it can take a
+moment until messages get acknowledged from RabbitMQ this can and probably 
will result in a message
+duplication and therefore this logic becomes at-least-once.
+
+By contrast, the __exactly-once-mode__ mode will not send messages on receive. 
All incoming messages

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to