[ https://issues.apache.org/jira/browse/FLINK-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100184#comment-17100184 ]
Austin Cawley-Edwards commented on FLINK-17204: ----------------------------------------------- Thanks for assigning [~aljoscha], PR is open. Should I go back to the mailing list thread to look for reviewers or are you able to take it whenever you have time? > The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default > value of durable when declaring the queue. > -------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-17204 > URL: https://issues.apache.org/jira/browse/FLINK-17204 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ > Affects Versions: 1.10.0 > Reporter: ChaojianZhang > Assignee: Austin Cawley-Edwards > Priority: Major > Labels: pull-request-available > > When the RabbitMQ queue is created and the value of durable is set to true. > When I use the data in the RabbitMQ queue as the source and sink it into > another queue of RabbitMQ after Flink processing, the program reports an > exception, the main exception information is as follows: > > {code:java} > ... > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue > 'rabbitmq_connectors_sink' in vhost '/': received 'false' but current is > 'true', class-id=50, method-id=10)Caused by: > com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: > #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - > inequivalent arg 'durable' for queue 'rabbitmq_connectors_sink' in vhost '/': > received 'false' but current is 'true', class-id=50, method-id=10) at > com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443) > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) at > com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) ... > 15 more > ...{code} > > I checked the source code of RMQSource and RMQSink and found that the > setupQueue() method of these two classes set the durable value in > queueDeclare() is inconsistent, I think they should be consistent to be > reasonable. > If possible, I want to fix it. -- This message was sent by Atlassian Jira (v8.3.4#803005)