[ https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503067#comment-16503067 ]
ASF GitHub Bot commented on FLINK-8468: --------------------------------------- Github user pduveau commented on a diff in the pull request: https://github.com/apache/flink/pull/5410#discussion_r193356675 --- Diff: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java --- @@ -124,7 +159,82 @@ public void closeAllResources() throws Exception { verify(connection).close(); } + @Test + public void invokeFeaturedPublishBytesToQueue() throws Exception { + RMQSink<String> rmqSink = createRMQSinkFeatured(); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test + public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws Exception { + RMQSink<String> rmqSink = createRMQSinkFeaturedReturnHandler(); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test(expected = RuntimeException.class) + public void exceptionDuringFeaturedPublishingIsNotIgnored() throws Exception { + RMQSink<String> rmqSink = createRMQSinkFeatured(); + + doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); + } + + @Test + public void exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception { + RMQSink<String> rmqSink = createRMQSinkFeatured(); + rmqSink.setLogFailuresOnly(true); + + doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); + } + + private class DummyPublishOptions implements RMQSinkPublishOptions<String> { --- End diff -- Done > Make the connector to take advantage of AMQP features (routing key, exchange > and message properties) > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-8468 > URL: https://issues.apache.org/jira/browse/FLINK-8468 > Project: Flink > Issue Type: Improvement > Components: RabbitMQ Connector > Affects Versions: 1.4.0 > Reporter: Ph.Duveau > Priority: Major > > Make the connector to take advantage of AMQP features by adding a constructor > and an interface to implement -- This message was sent by Atlassian JIRA (v7.6.3#76005)