Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r190648628 --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -40,22 +40,36 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - protected final String queueName; + protected String queueName; private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema<IN> schema; - private boolean logFailuresOnly = false; + protected boolean logFailuresOnly = false; + + protected RMQSinkPublishOptions<IN> messageCompute; --- End diff -- This field and `queueName` should be `final`. I think you need a third private constructor to achieve this: ``` private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema, RMQSinkPublishOptions<IN> messageCompute) { this.rmqConnectionConfig = rmqConnectionConfig; this.queueName = queueName; this.schema = schema; this.messageCompute = messageCompute; } ``` Then you can write ``` public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema<IN> schema, RMQSinkPublishOptions<IN> messageCompute) { this(rmqConnectionConfig, null, schema, messageCompute); } ```
---