Github user pduveau commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193356675 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -124,7 +159,82 @@ public void closeAllResources() throws Exception { verify(connection).close(); } + @Test + public void invokeFeaturedPublishBytesToQueue() throws Exception { + RMQSink<String> rmqSink = createRMQSinkFeatured(); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test + public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws Exception { + RMQSink<String> rmqSink = createRMQSinkFeaturedReturnHandler(); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test(expected = RuntimeException.class) + public void exceptionDuringFeaturedPublishingIsNotIgnored() throws Exception { + RMQSink<String> rmqSink = createRMQSinkFeatured(); + + doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); + } + + @Test + public void exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception { + RMQSink<String> rmqSink = createRMQSinkFeatured(); + rmqSink.setLogFailuresOnly(true); + + doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); + } + + private class DummyPublishOptions implements RMQSinkPublishOptions<String> { --- End diff -- Done
---