Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193122068 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +42,63 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + private final String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema<IN> schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; + + private final RMQSinkPublishOptions<IN> messageCompute; + private final ReturnListener returnListener; /** * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to publish messages to. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + * @param messageCompute A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties */ - public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) { + private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema, --- End diff -- We should add some validation to assert that `queueName` is `null` iff. `messageCompute` is set. We should also check that `RMQConnectionConfig` and `SerializationSchema` must not be null. You may want to use `org.apache.flink.util.Preconditions` for that.
---