Hello,

Thank you for you answers.

I am trying to experiment to see if i can do it with the info you gave me.

I took a simple example to start: qdmanage -b <dispatch-machine>:5672
create --type=connector role=route-container addr=broker-machine port=5673
name=rabih.connector
My goal is to imitate the behavior using JMS and Proton-j.

First, I tried with JMS (attached the code):
I used an ObjectMessage with JMS_AMQP_TYPED_ENCODING set to true. And I
used the proton-j encoder to encode the map before sending it.
But, I am getting a MessageFormatException("Failed to serialize object")

Do you have any idea how can i pass the byte array without failing?


Second, I tried with Proton-j (attached code):
The code i wrote is based on send example in proton-j.
But the problem is that i am not able to find how to set the destination
queue of the message. (i used Message.setAddress() but it is wrong)

How can i set a destination queue for the sender or message?

PS. I used wireshark to see the amqp messages that are passing on the
network and copy the message parameters.


Best Regards,
Rabih

On Wed, Jan 18, 2017 at 2:35 PM, Rob Godfrey <rob.j.godf...@gmail.com>
wrote:

> So I think what we said - and I can't find it written down anywhere in the
> draft, though we reference the JSON spec in the pre-amble, is that any
> value in the headers or body should be able to be sent in the native AMQP
> type (and we might need some words there about converting between various
> numeric types), or as a JSON serialized string.  We didn't (to my
> recollection) talk about whether there should be a way for the requester to
> be able to influence the form of the reply.
>
> Currently the implementation of AMQP Management in the Qpid Broker for Java
> follows the above conventions (any inbound value can be in the native type,
> or as a JSON string which can convert to the desired type, however there is
> no mechanism for controlling the nature of responses).
>
> Perhaps this is something we should talk about soon ;-) ?
>
> -- Rob
>
> On 18 January 2017 at 14:29, Ted Ross <tr...@redhat.com> wrote:
>
> >
> >
> > On 01/18/2017 07:45 AM, Gordon Sim wrote:
> >
> >> On 18/01/17 10:45, Rob Godfrey wrote:
> >>
> >>> In terms of sending maps/lists I think we said (at OASIS), though it is
> >>> possibly not yet in the draft spec, that Json formatted equivalents
> >>> should
> >>> be able to be used for all values... however I have no idea if the
> >>> Dispatch
> >>> Router supports that.
> >>>
> >>
> >> I think that would be very sensible.
> >>
> >
> > Dispatch Router does not support Json formatted bodies at present, but
> > this is a feature that I would be in favor of putting on the roadmap.
> >
> >
> >
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> >> For additional commands, e-mail: users-h...@qpid.apache.org
> >>
> >>
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> > For additional commands, e-mail: users-h...@qpid.apache.org
> >
> >
>
import java.nio.ByteBuffer;

import java.util.HashMap;
import java.util.Map;

import javax.jms.*;

import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.codec.*;


public class JMSTest {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new 
JmsConnectionFactory("amqp://<dispatchMachine>:<port>");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("$management");
        MessageProducer producer = session.createProducer(queue);
        ObjectMessage objectMessage = session.createObjectMessage();
        objectMessage.setBooleanProperty("JMS_AMQP_TYPED_ENCODING", true);
        objectMessage.setStringProperty("operation", "CREATE");
        objectMessage.setStringProperty("type", 
"org.apache.qpid.dispatch.connector");
        objectMessage.setStringProperty("name", "rabih.connector");
        Map<String, String> map = new HashMap();
        map.put("name", "rabih.connector");
        map.put("role", "route-container");
        map.put("addr", "brokerMachine");
        map.put("port", "port");
        EncoderImpl encoder = new EncoderImpl(new DecoderImpl());
        ByteBuffer byteBuffer = ByteBuffer.allocate(9999);
        WritableBuffer byteBufferWrapper = new 
WritableBuffer.ByteBufferWrapper(byteBuffer);
        encoder.setByteBuffer(byteBufferWrapper);
        encoder.writeMap(map);
        System.out.println(new String(byteBuffer.array()));
        objectMessage.setObject(byteBuffer.array());
        producer.send(objectMessage);
        connection.close();
    }
}
import java.io.IOException;

import java.nio.BufferOverflowException;

import java.util.HashMap;
import java.util.Map;

import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;


public class ProtonJTest extends BaseHandler {

    //~ 
----------------------------------------------------------------------------------------------------------------
    //~ Instance fields 
    //~ 
----------------------------------------------------------------------------------------------------------------

    private final String host = "dispatch-machine";
    private final int port = 5672;
    private final Message message;

    //~ 
----------------------------------------------------------------------------------------------------------------
    //~ Constructors 
    //~ 
----------------------------------------------------------------------------------------------------------------

    private ProtonJTest() {
        message = Proton.message();

        Map<String, String> annotations = new HashMap();
        annotations.put("operation", "CREATE");
        annotations.put("type", "org.apache.qpid.dispatch.connector");
        annotations.put("name", "rabih.connector");
        ApplicationProperties applicationProperties = new 
ApplicationProperties(annotations);
        message.setApplicationProperties(applicationProperties);
        message.setAddress("$management");

        Map<String, String> map = new HashMap();
        map.put("name", "rabih.connector");
        map.put("role", "route-container");
        map.put("addr", "broker-machine");
        map.put("port", "5673");

        message.setBody(new AmqpValue(map));
    }

    //~ 
----------------------------------------------------------------------------------------------------------------
    //~ Methods 
    //~ 
----------------------------------------------------------------------------------------------------------------

    public static void main(String[] args) throws IOException {
        Reactor r = Proton.reactor(new ProtonJTest());
        r.run();
    }

    @Override
    public void onReactorInit(Event event) {
        event.getReactor().connectionToHost(host, port, new 
SendHandler(message));
    }

    //~ 
----------------------------------------------------------------------------------------------------------------
    //~ Nested Classes 
    //~ 
----------------------------------------------------------------------------------------------------------------

    private class SendHandler extends BaseHandler {

        private final Message message;
        private int nextTag = 0;

        private SendHandler(Message message) {
            this.message = message;
            add(new Handshaker());
        }

        @Override
        public void onConnectionInit(Event event) {
            Connection conn = event.getConnection();
            Session ssn = conn.session();
            Sender snd = ssn.sender("s1");
            conn.open();
            ssn.open();
            snd.open();
        }

        @Override
        public void onLinkFlow(Event event) {
            Sender snd = (Sender) event.getLink();
            if (snd.getCredit() > 0) {
                byte[] msgData = new byte[1024];
                int length;
                while (true) {
                    try {
                        length = message.encode(msgData, 0, msgData.length);
                        break;
                    } catch (BufferOverflowException e) {
                        msgData = new byte[msgData.length * 2];
                    }
                }
                byte[] tag = String.valueOf(nextTag++).getBytes();
                Delivery dlv = snd.delivery(tag);
                snd.send(msgData, 0, length);
                dlv.settle();
                snd.advance();
                snd.close();
                snd.getSession().close();
                snd.getSession().getConnection().close();
            }
        }

        @Override
        public void onTransportError(Event event) {
            ErrorCondition condition = event.getTransport().getCondition();
            if (condition != null) {
                System.err.println("Error: " + condition.getDescription());
            } else {
                System.err.println("Error (no description returned).");
            }
        }
    }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to