Yanikovic commented on a change in pull request #15140:
URL: https://github.com/apache/flink/pull/15140#discussion_r594622998



##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md
##########
@@ -0,0 +1,63 @@
+# RabbitMQ Sink
+
+Flink's RabbitMQ connector provides a sink which enables you to publish your 
stream directly
+to a RabbitMQ exchange in three different consistency modes: at-most-once, 
at-least-once,
+and exactly-once. Furthermore, user defined publish options can be used to 
customize each message
+options in regard to exchange and publish settings in the RabbitMQ context.
+
+## Consistency Behaviour
+With __at-most-once__, the sink will simply take each message and publish the 
serialization of it
+(with publish options if given) to RabbitMQ. At this point the sink gives up 
the ownership of the message.
+
+For __at-least-once__ the same process as for at-most-once is executed except 
that the ownership of
+the message does not end immediately with publishing it. The sink will keep 
the individual publishing
+id for each message as well as the message itself and buffer it as long as it 
takes to receive the
+message received acknowledgment from RabbitMQ. Since we are in the need of 
buffering messages and waiting
+for their asynchronous acknowledgment, this requires _checkpointing enabled_. 
On each checkpoint,
+all to this point buffered (and thus send) but unacknowledged messages will be 
stored. Simultaneously,
+on each checkpoint a resend will be triggered to send all unacknowledged 
messages once again since
+we have to assume that something went wrong for it during the publishing 
process. Since it can take a
+moment until messages get acknowledged from RabbitMQ this can and probably 
will result in a message
+duplication and therefore this logic becomes at-least-once.
+
+By contrast, the __exactly-once-mode__ mode will not send messages on receive. 
All incoming messages
+will be buffered until a checkpoint is triggered. On each checkpoint all 
messages will be
+published/committed as one transaction to ensure the reception acknowledge by 
RabbitMQ.

Review comment:
       Done

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md
##########
@@ -0,0 +1,63 @@
+# RabbitMQ Sink
+
+Flink's RabbitMQ connector provides a sink which enables you to publish your 
stream directly
+to a RabbitMQ exchange in three different consistency modes: at-most-once, 
at-least-once,
+and exactly-once. Furthermore, user defined publish options can be used to 
customize each message
+options in regard to exchange and publish settings in the RabbitMQ context.
+
+## Consistency Behaviour
+With __at-most-once__, the sink will simply take each message and publish the 
serialization of it
+(with publish options if given) to RabbitMQ. At this point the sink gives up 
the ownership of the message.
+
+For __at-least-once__ the same process as for at-most-once is executed except 
that the ownership of
+the message does not end immediately with publishing it. The sink will keep 
the individual publishing
+id for each message as well as the message itself and buffer it as long as it 
takes to receive the
+message received acknowledgment from RabbitMQ. Since we are in the need of 
buffering messages and waiting
+for their asynchronous acknowledgment, this requires _checkpointing enabled_. 
On each checkpoint,
+all to this point buffered (and thus send) but unacknowledged messages will be 
stored. Simultaneously,
+on each checkpoint a resend will be triggered to send all unacknowledged 
messages once again since
+we have to assume that something went wrong for it during the publishing 
process. Since it can take a
+moment until messages get acknowledged from RabbitMQ this can and probably 
will result in a message
+duplication and therefore this logic becomes at-least-once.
+
+By contrast, the __exactly-once-mode__ mode will not send messages on receive. 
All incoming messages
+will be buffered until a checkpoint is triggered. On each checkpoint all 
messages will be
+published/committed as one transaction to ensure the reception acknowledge by 
RabbitMQ.
+If successful, all messages which were committed will be given up, otherwise 
they will be stored
+and tried to commit again in the next transaction during the next checkpoint.
+This behaviour ensures that each message will be stored in RabbitMQ exactly 
once but also has
+a performance drawback. Committing many messages will take time and will thus 
increase the overall
+time it takes to do a checkpoint. This can result in checkpoint delays and in 
peaks where
+checkpoint have either many or just a few messages. This also correlates to 
the latency of each message.

Review comment:
       Done

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import 
org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterAtLeastOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterAtMostOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * RabbitMQ sink (publisher) that publishes messages from upstream flink tasks 
to a RabbitMQ queue.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to