We are using STOMP over sockjs, spring integration and ActiveMQ as the
message broker.  The two consumers we have work just fine when no selectors
are used, but fail when selectors are used.  At this point we are scratching
our heads and are looking for ideas.  The selector is based on a header we
are adding in the presend of the outbound channel.  Selector in the test
java class is set up as so:
        String selector = "hello = 'world'";
        MessageConsumer consumer = session.createConsumer(destination,
selector);

client side selector is set up as so:
        var headers = {'selector':'hello=world'};
        var connectCallback = function(frame) {
            stompClient.subscribe("/topic/receipt",
function(frame){console.log(frame);}, headers);

The setup without selectors is as follows:

Our client side:
        var socket = new SockJS(this.getUrl());
        var stompClient = Stomp.over(socket);
        stompClient.connect('', '', 
        connectCallback,
        errorCallback
        );

        var connectCallback = function(frame) {
            stompClient.subscribe("/topic/receipt",
function(frame){console.log(frame);})
            stompClient.send("/app/" + url.join('/'), {"content-type":
"text/plain"}, "<message>test messaage</message>");
        };

On the Spring side configuration of the message broker
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
       
registry.addEndpoint("{product}/{security}/{system}/{tenant}/v1/pub").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
       //registry.enableSimpleBroker("/topic", "/queue/");
       registry.enableStompBrokerRelay("/topic", "/queue/");
       registry.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration
registration) {
    }

    @Override
    public void configureClientOutboundChannel(ChannelRegistration
registration) {
       registration.setInterceptors(new MyInterceptor());
    }
}

When the message is published, it goes through a spring controller first
before being sent to ActiveMQ
    @MessageMapping("{product}/{security}/{system}/{tenant}/v1/pub")
    @SendTo("/topic/receipt")
    public String publish( @DestinationVariable ("product") String product, 
           @DestinationVariable("security") String security,
           @DestinationVariable("system") String system,
           @DestinationVariable("tenant") String tenant,
           String message) throws Exception 
    {
                //do some stuff

In the interceptor presend, I was trying to add tags to the
header/nativeHeaders and came up with this message being sent to ActiveMQ
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
       StompHeaderAccessor headerAccessor =
StompHeaderAccessor.wrap(message);
       headerAccessor.setHeader("priority", "5");
       headerAccessor.setHeader("tree", "alpha");
       if(StompCommand.MESSAGE.equals(headerAccessor.getCommand())) {
       
               Map<String, Object> map = headerAccessor.toMap();
               map.put("key1", "value1");
               Map nativeHeaders = new HashMap();
               nativeHeaders.put("hello",
Collections.singletonList("world"));
               map.put(NativeMessageHeaderAccessor.NATIVE_HEADERS,
nativeHeaders);
              GenericMessage msg = new GenericMessage(message.getPayload(),
map);
               System.out.println("==> " + msg);
               return msg;
       }
       else {
            return message;
       }
    }

One consumer is on the client side in the connectCallback, and the other
consumer is the java class snippet below.
        Message replyJMSMessage = consumer.receive();
        System.out.println(replyJMSMessage);
        if (replyJMSMessage != null && replyJMSMessage instanceof
BytesMessage)
        {
            javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage)
replyJMSMessage;
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bytes);
            System.out.println("Reply Message");
            // the reply message
            String replyMessage = new String(bytes, "UTF-8");
            System.out.println("   " + replyMessage);
        }







--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Selectors-not-working-in-STOMP-over-sockjs-spring-integration-setup-tp4678358.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to