On Tue, Sep 25, 2007 at 12:32:31PM +0000, David Holroyd wrote: > So, I'm starting to learn C#, and am attempting to prototype some > generic NMS bridging code that I can use to connect ActiveMQ and MSMQ > implementations.
So, attached are some vaguely bridge-flavoured bits. Any comments? I suspect it should be doing its own threading, for instance? -- http://david.holroyd.me.uk/
namespace Test { using NMS; using System; public class Bridge { private IConnectionFactory srcFactory; private IConnectionFactory dstFactory; private IConnection srcConn; private IConnection dstConn; private IMessageProducer producer; private ISession dstSession ; private MessageMapper mapper; public Bridge(IConnectionFactory srcFactory, IConnectionFactory dstFactory, MessageMapper mapper) { this.srcFactory = srcFactory; this.dstFactory = dstFactory; this.mapper = mapper; } public void Connect() { srcConn = srcFactory.CreateConnection(); dstConn = dstFactory.CreateConnection(); } public void Close() { srcConn.Close(); dstConn.Close(); } public void Start(string srcQueueName, string dstQueueName) { ISession srcSession = srcConn.CreateSession(); dstSession = dstConn.CreateSession(); IQueue srcQueue = srcSession.GetQueue(srcQueueName); IQueue dstQueue = dstSession.GetQueue(dstQueueName); IMessageConsumer consumer = srcSession.CreateConsumer(srcQueue); producer = dstSession.CreateProducer(dstQueue); consumer.Listener += new MessageListener(OnMessage); } private void OnMessage(IMessage msg) { System.Console.WriteLine("Bridge processing one message"); try { producer.Send(mapper.map(msg, dstSession)); } catch (Exception e) { System.Console.WriteLine(e.ToString()); } System.Console.WriteLine("Bridge processing complete"); } } } // vim:sw=4:sts=4
namespace Test { using NMS; public interface MessageMapper { IMessage map(IMessage src, ISession dstSession); } }
namespace Test { using NMS; using System; public class SimpleMessageMapper : MessageMapper { public IMessage map(IMessage src, ISession dstSession) { IMessage dst; if (src is ITextMessage) { dst = mapText(src as ITextMessage, dstSession); } else { throw new Exception("Unhandled message type"); } mapCommon(src, dst); return dst; } private ITextMessage mapText(ITextMessage src, ISession dstSession) { ITextMessage dst = dstSession.CreateTextMessage(src.Text); return dst; } private void mapCommon(IMessage src, IMessage dst) { dst.NMSCorrelationID = src.NMSCorrelationID; dst.NMSExpiration = src.NMSExpiration; dst.NMSPersistent = src.NMSPersistent; dst.NMSPriority = src.NMSPriority; dst.NMSReplyTo = src.NMSReplyTo; if (src.NMSType == null) { System.Console.WriteLine("src.NMSType is null"); dst.NMSType = "Test"; } else { dst.NMSType = src.NMSType; } mapHeaders(src.Properties, dst.Properties); } private void mapHeaders(IPrimitiveMap src, IPrimitiveMap dst) { foreach (string key in src.Keys) { dst[key] = src[key]; } } } }
namespace Test { using System; using NMS; //using ActiveMQ.Commands; public class Test { public static void Main(string[] args) { IConnectionFactory amqFactory = new ActiveMQ.ConnectionFactory(new Uri("tcp://192.168.9.162:61616")); Console.WriteLine("Created AMQ fact"); IConnectionFactory msmqFactory = new MSMQ.ConnectionFactory(); Console.WriteLine("Created MSMQ fact"); MessageMapper mapper = new SimpleMessageMapper(); Bridge bridge = new Bridge(amqFactory, msmqFactory, mapper); bridge.Connect(); Console.WriteLine("Connected bridge"); bridge.Start("FOO.BAR", @".\blater"); Console.WriteLine("Started bridge"); System.Threading.Thread.Sleep(10000); } } }