SteNicholas commented on a change in pull request #15140:
URL: https://github.com/apache/flink/pull/15140#discussion_r775324812



##########
File path: flink-connectors/flink-connector-rabbitmq2/README.md
##########
@@ -19,6 +19,143 @@ and the Sink API specified in 
[FLIP-143](https://cwiki.apache.org/confluence/dis
 
 For more information about RabbitMQ visit https://www.rabbitmq.com/.
 
-In order to view how to use the connector inspect
-[Source](src/main/java/org/apache/flink/connector/rabbitmq2/source/README.md) 
and 
-[Sink](src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md).
+# RabbitMQ Source
+
+Flink's RabbitMQ connector provides a streaming-only source which enables you 
to receive messages

Review comment:
       Why RabbitMQ connector provide a streaming only source? Doesn't support 
the batch mode?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions;
+import 
org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState;
+import 
org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import 
org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides 
at-most-once,
+ * at-least-once or exactly-once processing semantics. For at-least-once and 
exactly-once,
+ * checkpointing needs to be enabled.
+ *
+ * <pre>{@code
+ * RabbitMQSink
+ *     .builder()
+ *     .setConnectionConfig(connectionConfig)
+ *     .setQueueName("queue")
+ *     .setSerializationSchema(new SimpleStringSchema())
+ *     .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ *     .build();
+ * }</pre>
+ *
+ * <p>When creating the sink a {@code connectionConfig} must be specified via 
{@link
+ * RabbitMQConnectionConfig}. It contains required information for the 
RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a 
(virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to publish to and 
a {@link
+ * SerializationSchema} for the sink input type is required. {@code 
publishOptions} can be added
+ * optionally to route messages in RabbitMQ.
+ *
+ * <p>If at-least-once is required messages are buffered until an 
acknowledgement arrives because
+ * delivery needs to be guaranteed. On each checkpoint, all unacknowledged 
messages will be resent
+ * to RabbitMQ. In case of a failure, all unacknowledged messages can be 
restored and resend.
+ *
+ * <p>In the case of exactly-once a transactional RabbitMQ channel is used to 
achieve that all
+ * messages within a checkpoint are delivered once and only once. All messages 
that arrive in a
+ * checkpoint interval are buffered and sent to RabbitMQ in a single 
transaction when the checkpoint
+ * is triggered. If the transaction fails, all messages that were a part of 
the transaction are put
+ * back into the buffer and a resend is issued in the next checkpoint.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and the 
performance will drop.
+ * Under heavy load, checkpoints can be delayed if a transaction takes longer 
than the specified
+ * checkpointing interval.
+ *
+ * <p>If publish options are used and the checkpointing mode is at-least-once 
or exactly-once, they
+ * require a {@link DeserializationSchema} to be provided because messages 
that were persisted as
+ * part of an earlier checkpoint are needed to recompute routing/exchange.
+ */
+public class RabbitMQSink<T> implements Sink<T, Void, 
RabbitMQSinkWriterState<T>, Void> {
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private final SerializationSchema<T> serializationSchema;
+    private final RabbitMQSinkPublishOptions<T> publishOptions;
+    private final ConsistencyMode consistencyMode;
+    private final SerializableReturnListener returnListener;
+
+    private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE = 
ConsistencyMode.AT_MOST_ONCE;
+
+    private RabbitMQSink(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            SerializationSchema<T> serializationSchema,
+            ConsistencyMode consistencyMode,
+            @Nullable SerializableReturnListener returnListener,
+            @Nullable RabbitMQSinkPublishOptions<T> publishOptions) {
+        this.connectionConfig = requireNonNull(connectionConfig);
+        this.queueName = requireNonNull(queueName);
+        this.serializationSchema = requireNonNull(serializationSchema);
+        this.consistencyMode = requireNonNull(consistencyMode);
+
+        this.returnListener = returnListener;
+
+        Preconditions.checkState(
+                verifyPublishOptions(),
+                "If consistency mode is stronger than at-most-once and publish 
options are defined "
+                        + "then publish options need a deserialization 
schema");
+        this.publishOptions = publishOptions;
+    }
+
+    private boolean verifyPublishOptions() {
+        // If at-most-once, doesn't matter if publish options are provided (no 
state in writer).
+        if (consistencyMode == ConsistencyMode.AT_MOST_ONCE) {
+            return true;
+        }
+        if (publishOptions == null) {

Review comment:
       Why not add the check to above check?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.common;
+
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class provides basic RabbitMQ functionality and common behaviour such 
as establishing and
+ * closing a connection via the {@code connectionConfig}. In addition, it 
provides methods for
+ * serializing and sending messages to RabbitMQ (with or without publish 
options).
+ *
+ * @param <T> The type of the messages that are published
+ */
+public class RabbitMQSinkConnection<T> {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkConnection.class);
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private Connection rmqConnection;
+    private Channel rmqChannel;
+
+    @Nullable private final RabbitMQSinkPublishOptions<T> publishOptions;
+
+    @Nullable private final SerializableReturnListener returnListener;
+
+    public RabbitMQSinkConnection(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            @Nullable RabbitMQSinkPublishOptions<T> publishOptions,
+            @Nullable SerializableReturnListener returnListener) {
+        this.connectionConfig = requireNonNull(connectionConfig);
+        this.queueName = requireNonNull(queueName);
+        this.publishOptions = publishOptions;
+        this.returnListener = returnListener;
+    }
+
+    /**
+     * Setup the RabbitMQ connection and a channel to send messages to.
+     *
+     * @throws Exception that might occur when setting up the connection and 
channel.
+     */
+    public void setupRabbitMQ() throws Exception {
+        LOG.info("Setup RabbitMQ");
+        this.rmqConnection = setupConnection(connectionConfig);
+        this.rmqChannel = setupChannel(rmqConnection, queueName, 
returnListener);
+    }
+
+    private Connection setupConnection(RabbitMQConnectionConfig 
connectionConfig) throws Exception {
+        return connectionConfig.getConnectionFactory().newConnection();
+    }
+
+    private Channel setupChannel(
+            Connection rmqConnection, String queueName, 
SerializableReturnListener returnListener)
+            throws IOException {
+        final Channel rmqChannel = rmqConnection.createChannel();
+        rmqChannel.queueDeclare(queueName, true, false, false, null);
+        if (returnListener != null) {
+            rmqChannel.addReturnListener(returnListener);
+        }
+        return rmqChannel;
+    }
+
+    /**
+     * Only used by at-least-once and exactly-once for resending messages that 
could not be
+     * delivered.
+     *
+     * @param message sink message wrapping the atomic message object
+     */
+    public void send(RabbitMQSinkMessageWrapper<T> message) throws IOException 
{
+        send(message.getMessage(), message.getBytes());
+    }
+
+    /**
+     * Publish a message to a queue in RabbitMQ. With publish options enabled, 
first compute the
+     * necessary publishing information.
+     *
+     * @param message original message, only required for publishing with 
publish options present
+     * @param serializedMessage serialized message to send to RabbitMQ
+     */
+    public void send(T message, byte[] serializedMessage) throws IOException {
+        if (publishOptions == null) {
+            rmqChannel.basicPublish("", queueName, null, serializedMessage);
+        } else {
+            publishWithOptions(message, serializedMessage);
+        }
+    }
+
+    private void publishWithOptions(T message, byte[] serializedMessage) 
throws IOException {
+        if (publishOptions == null) {
+            throw new RuntimeException("Try to publish with options without 
publishOptions.");
+        }
+
+        boolean mandatory = publishOptions.computeMandatory(message);
+        boolean immediate = publishOptions.computeImmediate(message);
+
+        Preconditions.checkState(
+                !(returnListener == null && (mandatory || immediate)),
+                "Setting mandatory and/or immediate flags to true requires a 
ReturnListener.");
+
+        String rk = publishOptions.computeRoutingKey(message);
+        String exchange = publishOptions.computeExchange(message);
+
+        rmqChannel.basicPublish(
+                exchange,
+                rk,
+                mandatory,
+                immediate,
+                publishOptions.computeProperties(message),
+                serializedMessage);
+    }
+
+    /**
+     * Close the channel and connection to RabbitMQ.
+     *
+     * @throws Exception channel or connection closing failed
+     */
+    public void close() throws Exception {
+        // close the channel
+        if (rmqChannel != null) {
+            rmqChannel.close();
+        }
+
+        // close the connection
+        if (rmqConnection != null) {
+            rmqConnection.close();

Review comment:
       Could here add `rmqConnection = null;`?

##########
File path: 
flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq2.sink.common;
+
+import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class provides basic RabbitMQ functionality and common behaviour such 
as establishing and
+ * closing a connection via the {@code connectionConfig}. In addition, it 
provides methods for
+ * serializing and sending messages to RabbitMQ (with or without publish 
options).
+ *
+ * @param <T> The type of the messages that are published
+ */
+public class RabbitMQSinkConnection<T> {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkConnection.class);
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private Connection rmqConnection;
+    private Channel rmqChannel;
+
+    @Nullable private final RabbitMQSinkPublishOptions<T> publishOptions;
+
+    @Nullable private final SerializableReturnListener returnListener;
+
+    public RabbitMQSinkConnection(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            @Nullable RabbitMQSinkPublishOptions<T> publishOptions,
+            @Nullable SerializableReturnListener returnListener) {
+        this.connectionConfig = requireNonNull(connectionConfig);
+        this.queueName = requireNonNull(queueName);
+        this.publishOptions = publishOptions;
+        this.returnListener = returnListener;
+    }
+
+    /**
+     * Setup the RabbitMQ connection and a channel to send messages to.
+     *
+     * @throws Exception that might occur when setting up the connection and 
channel.
+     */
+    public void setupRabbitMQ() throws Exception {
+        LOG.info("Setup RabbitMQ");
+        this.rmqConnection = setupConnection(connectionConfig);
+        this.rmqChannel = setupChannel(rmqConnection, queueName, 
returnListener);
+    }
+
+    private Connection setupConnection(RabbitMQConnectionConfig 
connectionConfig) throws Exception {
+        return connectionConfig.getConnectionFactory().newConnection();
+    }
+
+    private Channel setupChannel(
+            Connection rmqConnection, String queueName, 
SerializableReturnListener returnListener)
+            throws IOException {
+        final Channel rmqChannel = rmqConnection.createChannel();
+        rmqChannel.queueDeclare(queueName, true, false, false, null);
+        if (returnListener != null) {
+            rmqChannel.addReturnListener(returnListener);
+        }
+        return rmqChannel;
+    }
+
+    /**
+     * Only used by at-least-once and exactly-once for resending messages that 
could not be
+     * delivered.
+     *
+     * @param message sink message wrapping the atomic message object
+     */
+    public void send(RabbitMQSinkMessageWrapper<T> message) throws IOException 
{
+        send(message.getMessage(), message.getBytes());
+    }
+
+    /**
+     * Publish a message to a queue in RabbitMQ. With publish options enabled, 
first compute the
+     * necessary publishing information.
+     *
+     * @param message original message, only required for publishing with 
publish options present
+     * @param serializedMessage serialized message to send to RabbitMQ
+     */
+    public void send(T message, byte[] serializedMessage) throws IOException {
+        if (publishOptions == null) {
+            rmqChannel.basicPublish("", queueName, null, serializedMessage);
+        } else {
+            publishWithOptions(message, serializedMessage);
+        }
+    }
+
+    private void publishWithOptions(T message, byte[] serializedMessage) 
throws IOException {
+        if (publishOptions == null) {
+            throw new RuntimeException("Try to publish with options without 
publishOptions.");
+        }
+
+        boolean mandatory = publishOptions.computeMandatory(message);
+        boolean immediate = publishOptions.computeImmediate(message);
+
+        Preconditions.checkState(
+                !(returnListener == null && (mandatory || immediate)),
+                "Setting mandatory and/or immediate flags to true requires a 
ReturnListener.");
+
+        String rk = publishOptions.computeRoutingKey(message);
+        String exchange = publishOptions.computeExchange(message);
+
+        rmqChannel.basicPublish(
+                exchange,
+                rk,
+                mandatory,
+                immediate,
+                publishOptions.computeProperties(message),
+                serializedMessage);
+    }
+
+    /**
+     * Close the channel and connection to RabbitMQ.
+     *
+     * @throws Exception channel or connection closing failed
+     */
+    public void close() throws Exception {
+        // close the channel
+        if (rmqChannel != null) {
+            rmqChannel.close();

Review comment:
       Could here add `rmqChannel = null;`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to