On Tue, Sep 25, 2007 at 12:32:31PM +0000, David Holroyd wrote:
> So, I'm starting to learn C#, and am attempting to prototype some
> generic NMS bridging code that I can use to connect ActiveMQ and MSMQ
> implementations.

So, attached are some vaguely bridge-flavoured bits.

Any comments?

I suspect it should be doing its own threading, for instance?


-- 
http://david.holroyd.me.uk/
namespace Test
{
    using NMS;
    using System;

    public class Bridge
    {
        private IConnectionFactory srcFactory;
        private IConnectionFactory dstFactory;

        private IConnection srcConn;
        private IConnection dstConn;

        private IMessageProducer producer; 
        private ISession dstSession ;
        private MessageMapper mapper;

        public Bridge(IConnectionFactory srcFactory,
                      IConnectionFactory dstFactory,
                      MessageMapper mapper)
        {
            this.srcFactory = srcFactory;
            this.dstFactory = dstFactory;
            this.mapper = mapper;
        }

        public void Connect()
        {
            srcConn = srcFactory.CreateConnection();
            dstConn = dstFactory.CreateConnection();
        }

        public void Close()
        {
            srcConn.Close();
            dstConn.Close();
        }

        public void Start(string srcQueueName, string dstQueueName)
        {
            ISession srcSession = srcConn.CreateSession();
            dstSession = dstConn.CreateSession();

            IQueue srcQueue = srcSession.GetQueue(srcQueueName);
            IQueue dstQueue = dstSession.GetQueue(dstQueueName);

            IMessageConsumer consumer = srcSession.CreateConsumer(srcQueue);
            producer = dstSession.CreateProducer(dstQueue);

            consumer.Listener += new MessageListener(OnMessage);
        }

        private void OnMessage(IMessage msg)
        {
System.Console.WriteLine("Bridge processing one message");
            try
            {
                producer.Send(mapper.map(msg, dstSession));
            }
            catch (Exception e)
            {
                System.Console.WriteLine(e.ToString());
            }
System.Console.WriteLine("Bridge processing complete");
        }
    }
}
// vim:sw=4:sts=4
namespace Test
{
    using NMS;
    public interface MessageMapper
    {
        IMessage map(IMessage src, ISession dstSession);
    }
}
namespace Test
{
    using NMS;
    using System;

    public class SimpleMessageMapper : MessageMapper
    {
        public IMessage map(IMessage src, ISession dstSession)
        {
            IMessage dst;
            if (src is ITextMessage) {
                dst = mapText(src as ITextMessage, dstSession);
            } else {
                throw new Exception("Unhandled message type");
            }
            mapCommon(src, dst);
            return dst;
        }

        private ITextMessage mapText(ITextMessage src, ISession dstSession)
        {
            ITextMessage dst = dstSession.CreateTextMessage(src.Text);
            return dst;
        }

        private void mapCommon(IMessage src, IMessage dst)
        {
            dst.NMSCorrelationID = src.NMSCorrelationID;
            dst.NMSExpiration = src.NMSExpiration;
            dst.NMSPersistent = src.NMSPersistent;
            dst.NMSPriority = src.NMSPriority;
            dst.NMSReplyTo = src.NMSReplyTo;
            if (src.NMSType == null)
            {
System.Console.WriteLine("src.NMSType is null");
                dst.NMSType = "Test";
            }
            else
            {
                dst.NMSType = src.NMSType;
            }
            mapHeaders(src.Properties, dst.Properties);
        }

        private void mapHeaders(IPrimitiveMap src, IPrimitiveMap dst)
        {
            foreach (string key in src.Keys)
            {
                dst[key] = src[key];
            }
        }
    }
}
namespace Test {

    using System;
    using NMS;
    //using ActiveMQ.Commands;

    public class Test {

        public static void Main(string[] args) {

            IConnectionFactory amqFactory = new ActiveMQ.ConnectionFactory(new 
Uri("tcp://192.168.9.162:61616"));
Console.WriteLine("Created AMQ fact");
            IConnectionFactory msmqFactory = new MSMQ.ConnectionFactory();
Console.WriteLine("Created MSMQ fact");

            MessageMapper mapper = new SimpleMessageMapper();
            Bridge bridge = new Bridge(amqFactory, msmqFactory, mapper);
            bridge.Connect();
Console.WriteLine("Connected bridge");
            bridge.Start("FOO.BAR", @".\blater");
Console.WriteLine("Started bridge");
            System.Threading.Thread.Sleep(10000);
        }
    }
}
  • NMS bridge David Holroyd
    • Re: NMS bridge David Holroyd

Reply via email to