SteNicholas commented on a change in pull request #15140: URL: https://github.com/apache/flink/pull/15140#discussion_r775324812
########## File path: flink-connectors/flink-connector-rabbitmq2/README.md ########## @@ -19,6 +19,143 @@ and the Sink API specified in [FLIP-143](https://cwiki.apache.org/confluence/dis For more information about RabbitMQ visit https://www.rabbitmq.com/. -In order to view how to use the connector inspect -[Source](src/main/java/org/apache/flink/connector/rabbitmq2/source/README.md) and -[Sink](src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md). +# RabbitMQ Source + +Flink's RabbitMQ connector provides a streaming-only source which enables you to receive messages Review comment: Why RabbitMQ connector provide a streaming only source? Doesn't support the batch mode? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer; +import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides at-most-once, + * at-least-once or exactly-once processing semantics. For at-least-once and exactly-once, + * checkpointing needs to be enabled. + * + * <pre>{@code + * RabbitMQSink + * .builder() + * .setConnectionConfig(connectionConfig) + * .setQueueName("queue") + * .setSerializationSchema(new SimpleStringSchema()) + * .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE) + * .build(); + * }</pre> + * + * <p>When creating the sink a {@code connectionConfig} must be specified via {@link + * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to + * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a + * password and a port. Besides that, the {@code queueName} to publish to and a {@link + * SerializationSchema} for the sink input type is required. {@code publishOptions} can be added + * optionally to route messages in RabbitMQ. + * + * <p>If at-least-once is required messages are buffered until an acknowledgement arrives because + * delivery needs to be guaranteed. On each checkpoint, all unacknowledged messages will be resent + * to RabbitMQ. In case of a failure, all unacknowledged messages can be restored and resend. + * + * <p>In the case of exactly-once a transactional RabbitMQ channel is used to achieve that all + * messages within a checkpoint are delivered once and only once. All messages that arrive in a + * checkpoint interval are buffered and sent to RabbitMQ in a single transaction when the checkpoint + * is triggered. If the transaction fails, all messages that were a part of the transaction are put + * back into the buffer and a resend is issued in the next checkpoint. + * + * <p>Keep in mind that the transactional channels are heavyweight and the performance will drop. + * Under heavy load, checkpoints can be delayed if a transaction takes longer than the specified + * checkpointing interval. + * + * <p>If publish options are used and the checkpointing mode is at-least-once or exactly-once, they + * require a {@link DeserializationSchema} to be provided because messages that were persisted as + * part of an earlier checkpoint are needed to recompute routing/exchange. + */ +public class RabbitMQSink<T> implements Sink<T, Void, RabbitMQSinkWriterState<T>, Void> { + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private final SerializationSchema<T> serializationSchema; + private final RabbitMQSinkPublishOptions<T> publishOptions; + private final ConsistencyMode consistencyMode; + private final SerializableReturnListener returnListener; + + private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE = ConsistencyMode.AT_MOST_ONCE; + + private RabbitMQSink( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema<T> serializationSchema, + ConsistencyMode consistencyMode, + @Nullable SerializableReturnListener returnListener, + @Nullable RabbitMQSinkPublishOptions<T> publishOptions) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.serializationSchema = requireNonNull(serializationSchema); + this.consistencyMode = requireNonNull(consistencyMode); + + this.returnListener = returnListener; + + Preconditions.checkState( + verifyPublishOptions(), + "If consistency mode is stronger than at-most-once and publish options are defined " + + "then publish options need a deserialization schema"); + this.publishOptions = publishOptions; + } + + private boolean verifyPublishOptions() { + // If at-most-once, doesn't matter if publish options are provided (no state in writer). + if (consistencyMode == ConsistencyMode.AT_MOST_ONCE) { + return true; + } + if (publishOptions == null) { Review comment: Why not add the check to above check? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink.common; + +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.util.Preconditions; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +/** + * This class provides basic RabbitMQ functionality and common behaviour such as establishing and + * closing a connection via the {@code connectionConfig}. In addition, it provides methods for + * serializing and sending messages to RabbitMQ (with or without publish options). + * + * @param <T> The type of the messages that are published + */ +public class RabbitMQSinkConnection<T> { + protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkConnection.class); + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private Connection rmqConnection; + private Channel rmqChannel; + + @Nullable private final RabbitMQSinkPublishOptions<T> publishOptions; + + @Nullable private final SerializableReturnListener returnListener; + + public RabbitMQSinkConnection( + RabbitMQConnectionConfig connectionConfig, + String queueName, + @Nullable RabbitMQSinkPublishOptions<T> publishOptions, + @Nullable SerializableReturnListener returnListener) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.publishOptions = publishOptions; + this.returnListener = returnListener; + } + + /** + * Setup the RabbitMQ connection and a channel to send messages to. + * + * @throws Exception that might occur when setting up the connection and channel. + */ + public void setupRabbitMQ() throws Exception { + LOG.info("Setup RabbitMQ"); + this.rmqConnection = setupConnection(connectionConfig); + this.rmqChannel = setupChannel(rmqConnection, queueName, returnListener); + } + + private Connection setupConnection(RabbitMQConnectionConfig connectionConfig) throws Exception { + return connectionConfig.getConnectionFactory().newConnection(); + } + + private Channel setupChannel( + Connection rmqConnection, String queueName, SerializableReturnListener returnListener) + throws IOException { + final Channel rmqChannel = rmqConnection.createChannel(); + rmqChannel.queueDeclare(queueName, true, false, false, null); + if (returnListener != null) { + rmqChannel.addReturnListener(returnListener); + } + return rmqChannel; + } + + /** + * Only used by at-least-once and exactly-once for resending messages that could not be + * delivered. + * + * @param message sink message wrapping the atomic message object + */ + public void send(RabbitMQSinkMessageWrapper<T> message) throws IOException { + send(message.getMessage(), message.getBytes()); + } + + /** + * Publish a message to a queue in RabbitMQ. With publish options enabled, first compute the + * necessary publishing information. + * + * @param message original message, only required for publishing with publish options present + * @param serializedMessage serialized message to send to RabbitMQ + */ + public void send(T message, byte[] serializedMessage) throws IOException { + if (publishOptions == null) { + rmqChannel.basicPublish("", queueName, null, serializedMessage); + } else { + publishWithOptions(message, serializedMessage); + } + } + + private void publishWithOptions(T message, byte[] serializedMessage) throws IOException { + if (publishOptions == null) { + throw new RuntimeException("Try to publish with options without publishOptions."); + } + + boolean mandatory = publishOptions.computeMandatory(message); + boolean immediate = publishOptions.computeImmediate(message); + + Preconditions.checkState( + !(returnListener == null && (mandatory || immediate)), + "Setting mandatory and/or immediate flags to true requires a ReturnListener."); + + String rk = publishOptions.computeRoutingKey(message); + String exchange = publishOptions.computeExchange(message); + + rmqChannel.basicPublish( + exchange, + rk, + mandatory, + immediate, + publishOptions.computeProperties(message), + serializedMessage); + } + + /** + * Close the channel and connection to RabbitMQ. + * + * @throws Exception channel or connection closing failed + */ + public void close() throws Exception { + // close the channel + if (rmqChannel != null) { + rmqChannel.close(); + } + + // close the connection + if (rmqConnection != null) { + rmqConnection.close(); Review comment: Could here add `rmqConnection = null;`? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/common/RabbitMQSinkConnection.java ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink.common; + +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.util.Preconditions; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +/** + * This class provides basic RabbitMQ functionality and common behaviour such as establishing and + * closing a connection via the {@code connectionConfig}. In addition, it provides methods for + * serializing and sending messages to RabbitMQ (with or without publish options). + * + * @param <T> The type of the messages that are published + */ +public class RabbitMQSinkConnection<T> { + protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkConnection.class); + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private Connection rmqConnection; + private Channel rmqChannel; + + @Nullable private final RabbitMQSinkPublishOptions<T> publishOptions; + + @Nullable private final SerializableReturnListener returnListener; + + public RabbitMQSinkConnection( + RabbitMQConnectionConfig connectionConfig, + String queueName, + @Nullable RabbitMQSinkPublishOptions<T> publishOptions, + @Nullable SerializableReturnListener returnListener) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.publishOptions = publishOptions; + this.returnListener = returnListener; + } + + /** + * Setup the RabbitMQ connection and a channel to send messages to. + * + * @throws Exception that might occur when setting up the connection and channel. + */ + public void setupRabbitMQ() throws Exception { + LOG.info("Setup RabbitMQ"); + this.rmqConnection = setupConnection(connectionConfig); + this.rmqChannel = setupChannel(rmqConnection, queueName, returnListener); + } + + private Connection setupConnection(RabbitMQConnectionConfig connectionConfig) throws Exception { + return connectionConfig.getConnectionFactory().newConnection(); + } + + private Channel setupChannel( + Connection rmqConnection, String queueName, SerializableReturnListener returnListener) + throws IOException { + final Channel rmqChannel = rmqConnection.createChannel(); + rmqChannel.queueDeclare(queueName, true, false, false, null); + if (returnListener != null) { + rmqChannel.addReturnListener(returnListener); + } + return rmqChannel; + } + + /** + * Only used by at-least-once and exactly-once for resending messages that could not be + * delivered. + * + * @param message sink message wrapping the atomic message object + */ + public void send(RabbitMQSinkMessageWrapper<T> message) throws IOException { + send(message.getMessage(), message.getBytes()); + } + + /** + * Publish a message to a queue in RabbitMQ. With publish options enabled, first compute the + * necessary publishing information. + * + * @param message original message, only required for publishing with publish options present + * @param serializedMessage serialized message to send to RabbitMQ + */ + public void send(T message, byte[] serializedMessage) throws IOException { + if (publishOptions == null) { + rmqChannel.basicPublish("", queueName, null, serializedMessage); + } else { + publishWithOptions(message, serializedMessage); + } + } + + private void publishWithOptions(T message, byte[] serializedMessage) throws IOException { + if (publishOptions == null) { + throw new RuntimeException("Try to publish with options without publishOptions."); + } + + boolean mandatory = publishOptions.computeMandatory(message); + boolean immediate = publishOptions.computeImmediate(message); + + Preconditions.checkState( + !(returnListener == null && (mandatory || immediate)), + "Setting mandatory and/or immediate flags to true requires a ReturnListener."); + + String rk = publishOptions.computeRoutingKey(message); + String exchange = publishOptions.computeExchange(message); + + rmqChannel.basicPublish( + exchange, + rk, + mandatory, + immediate, + publishOptions.computeProperties(message), + serializedMessage); + } + + /** + * Close the channel and connection to RabbitMQ. + * + * @throws Exception channel or connection closing failed + */ + public void close() throws Exception { + // close the channel + if (rmqChannel != null) { + rmqChannel.close(); Review comment: Could here add `rmqChannel = null;`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org