Hello, Thank you for you answers.
I am trying to experiment to see if i can do it with the info you gave me. I took a simple example to start: qdmanage -b <dispatch-machine>:5672 create --type=connector role=route-container addr=broker-machine port=5673 name=rabih.connector My goal is to imitate the behavior using JMS and Proton-j. First, I tried with JMS (attached the code): I used an ObjectMessage with JMS_AMQP_TYPED_ENCODING set to true. And I used the proton-j encoder to encode the map before sending it. But, I am getting a MessageFormatException("Failed to serialize object") Do you have any idea how can i pass the byte array without failing? Second, I tried with Proton-j (attached code): The code i wrote is based on send example in proton-j. But the problem is that i am not able to find how to set the destination queue of the message. (i used Message.setAddress() but it is wrong) How can i set a destination queue for the sender or message? PS. I used wireshark to see the amqp messages that are passing on the network and copy the message parameters. Best Regards, Rabih On Wed, Jan 18, 2017 at 2:35 PM, Rob Godfrey <rob.j.godf...@gmail.com> wrote: > So I think what we said - and I can't find it written down anywhere in the > draft, though we reference the JSON spec in the pre-amble, is that any > value in the headers or body should be able to be sent in the native AMQP > type (and we might need some words there about converting between various > numeric types), or as a JSON serialized string. We didn't (to my > recollection) talk about whether there should be a way for the requester to > be able to influence the form of the reply. > > Currently the implementation of AMQP Management in the Qpid Broker for Java > follows the above conventions (any inbound value can be in the native type, > or as a JSON string which can convert to the desired type, however there is > no mechanism for controlling the nature of responses). > > Perhaps this is something we should talk about soon ;-) ? > > -- Rob > > On 18 January 2017 at 14:29, Ted Ross <tr...@redhat.com> wrote: > > > > > > > On 01/18/2017 07:45 AM, Gordon Sim wrote: > > > >> On 18/01/17 10:45, Rob Godfrey wrote: > >> > >>> In terms of sending maps/lists I think we said (at OASIS), though it is > >>> possibly not yet in the draft spec, that Json formatted equivalents > >>> should > >>> be able to be used for all values... however I have no idea if the > >>> Dispatch > >>> Router supports that. > >>> > >> > >> I think that would be very sensible. > >> > > > > Dispatch Router does not support Json formatted bodies at present, but > > this is a feature that I would be in favor of putting on the roadmap. > > > > > > > >> > >> --------------------------------------------------------------------- > >> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > >> For additional commands, e-mail: users-h...@qpid.apache.org > >> > >> > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > > For additional commands, e-mail: users-h...@qpid.apache.org > > > > >
import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import javax.jms.*; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.codec.*; public class JMSTest { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://<dispatchMachine>:<port>"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("$management"); MessageProducer producer = session.createProducer(queue); ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setBooleanProperty("JMS_AMQP_TYPED_ENCODING", true); objectMessage.setStringProperty("operation", "CREATE"); objectMessage.setStringProperty("type", "org.apache.qpid.dispatch.connector"); objectMessage.setStringProperty("name", "rabih.connector"); Map<String, String> map = new HashMap(); map.put("name", "rabih.connector"); map.put("role", "route-container"); map.put("addr", "brokerMachine"); map.put("port", "port"); EncoderImpl encoder = new EncoderImpl(new DecoderImpl()); ByteBuffer byteBuffer = ByteBuffer.allocate(9999); WritableBuffer byteBufferWrapper = new WritableBuffer.ByteBufferWrapper(byteBuffer); encoder.setByteBuffer(byteBufferWrapper); encoder.writeMap(map); System.out.println(new String(byteBuffer.array())); objectMessage.setObject(byteBuffer.array()); producer.send(objectMessage); connection.close(); } }
import java.io.IOException; import java.nio.BufferOverflowException; import java.util.HashMap; import java.util.Map; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.reactor.Handshaker; import org.apache.qpid.proton.reactor.Reactor; public class ProtonJTest extends BaseHandler { //~ ---------------------------------------------------------------------------------------------------------------- //~ Instance fields //~ ---------------------------------------------------------------------------------------------------------------- private final String host = "dispatch-machine"; private final int port = 5672; private final Message message; //~ ---------------------------------------------------------------------------------------------------------------- //~ Constructors //~ ---------------------------------------------------------------------------------------------------------------- private ProtonJTest() { message = Proton.message(); Map<String, String> annotations = new HashMap(); annotations.put("operation", "CREATE"); annotations.put("type", "org.apache.qpid.dispatch.connector"); annotations.put("name", "rabih.connector"); ApplicationProperties applicationProperties = new ApplicationProperties(annotations); message.setApplicationProperties(applicationProperties); message.setAddress("$management"); Map<String, String> map = new HashMap(); map.put("name", "rabih.connector"); map.put("role", "route-container"); map.put("addr", "broker-machine"); map.put("port", "5673"); message.setBody(new AmqpValue(map)); } //~ ---------------------------------------------------------------------------------------------------------------- //~ Methods //~ ---------------------------------------------------------------------------------------------------------------- public static void main(String[] args) throws IOException { Reactor r = Proton.reactor(new ProtonJTest()); r.run(); } @Override public void onReactorInit(Event event) { event.getReactor().connectionToHost(host, port, new SendHandler(message)); } //~ ---------------------------------------------------------------------------------------------------------------- //~ Nested Classes //~ ---------------------------------------------------------------------------------------------------------------- private class SendHandler extends BaseHandler { private final Message message; private int nextTag = 0; private SendHandler(Message message) { this.message = message; add(new Handshaker()); } @Override public void onConnectionInit(Event event) { Connection conn = event.getConnection(); Session ssn = conn.session(); Sender snd = ssn.sender("s1"); conn.open(); ssn.open(); snd.open(); } @Override public void onLinkFlow(Event event) { Sender snd = (Sender) event.getLink(); if (snd.getCredit() > 0) { byte[] msgData = new byte[1024]; int length; while (true) { try { length = message.encode(msgData, 0, msgData.length); break; } catch (BufferOverflowException e) { msgData = new byte[msgData.length * 2]; } } byte[] tag = String.valueOf(nextTag++).getBytes(); Delivery dlv = snd.delivery(tag); snd.send(msgData, 0, length); dlv.settle(); snd.advance(); snd.close(); snd.getSession().close(); snd.getSession().getConnection().close(); } } @Override public void onTransportError(Event event) { ErrorCondition condition = event.getTransport().getCondition(); if (condition != null) { System.err.println("Error: " + condition.getDescription()); } else { System.err.println("Error (no description returned)."); } } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org For additional commands, e-mail: users-h...@qpid.apache.org