Yanikovic commented on a change in pull request #15140: URL: https://github.com/apache/flink/pull/15140#discussion_r594622998
########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md ########## @@ -0,0 +1,63 @@ +# RabbitMQ Sink + +Flink's RabbitMQ connector provides a sink which enables you to publish your stream directly +to a RabbitMQ exchange in three different consistency modes: at-most-once, at-least-once, +and exactly-once. Furthermore, user defined publish options can be used to customize each message +options in regard to exchange and publish settings in the RabbitMQ context. + +## Consistency Behaviour +With __at-most-once__, the sink will simply take each message and publish the serialization of it +(with publish options if given) to RabbitMQ. At this point the sink gives up the ownership of the message. + +For __at-least-once__ the same process as for at-most-once is executed except that the ownership of +the message does not end immediately with publishing it. The sink will keep the individual publishing +id for each message as well as the message itself and buffer it as long as it takes to receive the +message received acknowledgment from RabbitMQ. Since we are in the need of buffering messages and waiting +for their asynchronous acknowledgment, this requires _checkpointing enabled_. On each checkpoint, +all to this point buffered (and thus send) but unacknowledged messages will be stored. Simultaneously, +on each checkpoint a resend will be triggered to send all unacknowledged messages once again since +we have to assume that something went wrong for it during the publishing process. Since it can take a +moment until messages get acknowledged from RabbitMQ this can and probably will result in a message +duplication and therefore this logic becomes at-least-once. + +By contrast, the __exactly-once-mode__ mode will not send messages on receive. All incoming messages +will be buffered until a checkpoint is triggered. On each checkpoint all messages will be +published/committed as one transaction to ensure the reception acknowledge by RabbitMQ. Review comment: Done ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md ########## @@ -0,0 +1,63 @@ +# RabbitMQ Sink + +Flink's RabbitMQ connector provides a sink which enables you to publish your stream directly +to a RabbitMQ exchange in three different consistency modes: at-most-once, at-least-once, +and exactly-once. Furthermore, user defined publish options can be used to customize each message +options in regard to exchange and publish settings in the RabbitMQ context. + +## Consistency Behaviour +With __at-most-once__, the sink will simply take each message and publish the serialization of it +(with publish options if given) to RabbitMQ. At this point the sink gives up the ownership of the message. + +For __at-least-once__ the same process as for at-most-once is executed except that the ownership of +the message does not end immediately with publishing it. The sink will keep the individual publishing +id for each message as well as the message itself and buffer it as long as it takes to receive the +message received acknowledgment from RabbitMQ. Since we are in the need of buffering messages and waiting +for their asynchronous acknowledgment, this requires _checkpointing enabled_. On each checkpoint, +all to this point buffered (and thus send) but unacknowledged messages will be stored. Simultaneously, +on each checkpoint a resend will be triggered to send all unacknowledged messages once again since +we have to assume that something went wrong for it during the publishing process. Since it can take a +moment until messages get acknowledged from RabbitMQ this can and probably will result in a message +duplication and therefore this logic becomes at-least-once. + +By contrast, the __exactly-once-mode__ mode will not send messages on receive. All incoming messages +will be buffered until a checkpoint is triggered. On each checkpoint all messages will be +published/committed as one transaction to ensure the reception acknowledge by RabbitMQ. +If successful, all messages which were committed will be given up, otherwise they will be stored +and tried to commit again in the next transaction during the next checkpoint. +This behaviour ensures that each message will be stored in RabbitMQ exactly once but also has +a performance drawback. Committing many messages will take time and will thus increase the overall +time it takes to do a checkpoint. This can result in checkpoint delays and in peaks where +checkpoint have either many or just a few messages. This also correlates to the latency of each message. Review comment: Done ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq2.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer; +import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase; +import org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterAtLeastOnce; +import org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterAtMostOnce; +import org.apache.flink.connector.rabbitmq2.sink.writer.specalized.RabbitMQSinkWriterExactlyOnce; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * RabbitMQ sink (publisher) that publishes messages from upstream flink tasks to a RabbitMQ queue. Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org