Hello,
Thank you for you answers.
I am trying to experiment to see if i can do it with the info you gave me.
I took a simple example to start: qdmanage -b <dispatch-machine>:5672
create --type=connector role=route-container addr=broker-machine port=5673
name=rabih.connector
My goal is to imitate the behavior using JMS and Proton-j.
First, I tried with JMS (attached the code):
I used an ObjectMessage with JMS_AMQP_TYPED_ENCODING set to true. And I
used the proton-j encoder to encode the map before sending it.
But, I am getting a MessageFormatException("Failed to serialize object")
Do you have any idea how can i pass the byte array without failing?
Second, I tried with Proton-j (attached code):
The code i wrote is based on send example in proton-j.
But the problem is that i am not able to find how to set the destination
queue of the message. (i used Message.setAddress() but it is wrong)
How can i set a destination queue for the sender or message?
PS. I used wireshark to see the amqp messages that are passing on the
network and copy the message parameters.
Best Regards,
Rabih
On Wed, Jan 18, 2017 at 2:35 PM, Rob Godfrey <[email protected]>
wrote:
> So I think what we said - and I can't find it written down anywhere in the
> draft, though we reference the JSON spec in the pre-amble, is that any
> value in the headers or body should be able to be sent in the native AMQP
> type (and we might need some words there about converting between various
> numeric types), or as a JSON serialized string. We didn't (to my
> recollection) talk about whether there should be a way for the requester to
> be able to influence the form of the reply.
>
> Currently the implementation of AMQP Management in the Qpid Broker for Java
> follows the above conventions (any inbound value can be in the native type,
> or as a JSON string which can convert to the desired type, however there is
> no mechanism for controlling the nature of responses).
>
> Perhaps this is something we should talk about soon ;-) ?
>
> -- Rob
>
> On 18 January 2017 at 14:29, Ted Ross <[email protected]> wrote:
>
> >
> >
> > On 01/18/2017 07:45 AM, Gordon Sim wrote:
> >
> >> On 18/01/17 10:45, Rob Godfrey wrote:
> >>
> >>> In terms of sending maps/lists I think we said (at OASIS), though it is
> >>> possibly not yet in the draft spec, that Json formatted equivalents
> >>> should
> >>> be able to be used for all values... however I have no idea if the
> >>> Dispatch
> >>> Router supports that.
> >>>
> >>
> >> I think that would be very sensible.
> >>
> >
> > Dispatch Router does not support Json formatted bodies at present, but
> > this is a feature that I would be in favor of putting on the roadmap.
> >
> >
> >
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: [email protected]
> >> For additional commands, e-mail: [email protected]
> >>
> >>
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]
> >
> >
>
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.jms.*;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.codec.*;
public class JMSTest {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new
JmsConnectionFactory("amqp://<dispatchMachine>:<port>");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("$management");
MessageProducer producer = session.createProducer(queue);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setBooleanProperty("JMS_AMQP_TYPED_ENCODING", true);
objectMessage.setStringProperty("operation", "CREATE");
objectMessage.setStringProperty("type",
"org.apache.qpid.dispatch.connector");
objectMessage.setStringProperty("name", "rabih.connector");
Map<String, String> map = new HashMap();
map.put("name", "rabih.connector");
map.put("role", "route-container");
map.put("addr", "brokerMachine");
map.put("port", "port");
EncoderImpl encoder = new EncoderImpl(new DecoderImpl());
ByteBuffer byteBuffer = ByteBuffer.allocate(9999);
WritableBuffer byteBufferWrapper = new
WritableBuffer.ByteBufferWrapper(byteBuffer);
encoder.setByteBuffer(byteBufferWrapper);
encoder.writeMap(map);
System.out.println(new String(byteBuffer.array()));
objectMessage.setObject(byteBuffer.array());
producer.send(objectMessage);
connection.close();
}
}
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
public class ProtonJTest extends BaseHandler {
//~
----------------------------------------------------------------------------------------------------------------
//~ Instance fields
//~
----------------------------------------------------------------------------------------------------------------
private final String host = "dispatch-machine";
private final int port = 5672;
private final Message message;
//~
----------------------------------------------------------------------------------------------------------------
//~ Constructors
//~
----------------------------------------------------------------------------------------------------------------
private ProtonJTest() {
message = Proton.message();
Map<String, String> annotations = new HashMap();
annotations.put("operation", "CREATE");
annotations.put("type", "org.apache.qpid.dispatch.connector");
annotations.put("name", "rabih.connector");
ApplicationProperties applicationProperties = new
ApplicationProperties(annotations);
message.setApplicationProperties(applicationProperties);
message.setAddress("$management");
Map<String, String> map = new HashMap();
map.put("name", "rabih.connector");
map.put("role", "route-container");
map.put("addr", "broker-machine");
map.put("port", "5673");
message.setBody(new AmqpValue(map));
}
//~
----------------------------------------------------------------------------------------------------------------
//~ Methods
//~
----------------------------------------------------------------------------------------------------------------
public static void main(String[] args) throws IOException {
Reactor r = Proton.reactor(new ProtonJTest());
r.run();
}
@Override
public void onReactorInit(Event event) {
event.getReactor().connectionToHost(host, port, new
SendHandler(message));
}
//~
----------------------------------------------------------------------------------------------------------------
//~ Nested Classes
//~
----------------------------------------------------------------------------------------------------------------
private class SendHandler extends BaseHandler {
private final Message message;
private int nextTag = 0;
private SendHandler(Message message) {
this.message = message;
add(new Handshaker());
}
@Override
public void onConnectionInit(Event event) {
Connection conn = event.getConnection();
Session ssn = conn.session();
Sender snd = ssn.sender("s1");
conn.open();
ssn.open();
snd.open();
}
@Override
public void onLinkFlow(Event event) {
Sender snd = (Sender) event.getLink();
if (snd.getCredit() > 0) {
byte[] msgData = new byte[1024];
int length;
while (true) {
try {
length = message.encode(msgData, 0, msgData.length);
break;
} catch (BufferOverflowException e) {
msgData = new byte[msgData.length * 2];
}
}
byte[] tag = String.valueOf(nextTag++).getBytes();
Delivery dlv = snd.delivery(tag);
snd.send(msgData, 0, length);
dlv.settle();
snd.advance();
snd.close();
snd.getSession().close();
snd.getSession().getConnection().close();
}
}
@Override
public void onTransportError(Event event) {
ErrorCondition condition = event.getTransport().getCondition();
if (condition != null) {
System.err.println("Error: " + condition.getDescription());
} else {
System.err.println("Error (no description returned).");
}
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]