This is the follow-up code I wrote to help me understand JMS-Topic (javax.jms.Topic).
Regards, John (ns jms.jms-topic (:use [jms.jms-test :only (get-initial-context get-message-text)]) (:import (javax.jms Session MessageListener))) (defn publish-term-message-to-topic [tPublisher tSession] (.publish tPublisher (.createMessage tSession))) (defn publish-message-to-topic [tPublisher tSession] (let [message (.createTextMessage tSession)] (.setText message (get-message-text)) (println (format "Publishing message - %s ..." (.getText message))) (.publish tPublisher message))) (defn publish-n-messages-to-topic [tPublisher tSession num-messages qReceiver qConnection] ;; Wait for subscribers (println "Waiting for subscribers...") (let [subscriber-available (ref false)] (.setMessageListener qReceiver (proxy [MessageListener][] (onMessage [message] (dosync (ref-set subscriber- available true))))) (.start qConnection) (while (false? @subscriber-available) (Thread/sleep 1000)) (.close qConnection) (println "Hoo-rah!! We have a subscriber")) (loop [n num-messages] (if (zero? n) (do (publish-term-message-to-topic tPublisher tSession) (println "*Done publishing messages to topic!*")) (recur (do (publish-message-to-topic tPublisher tSession) (dec n)))))) (defn process-topic-messages [tSubscriber tConnection qSender qSession] ;; Snooze a bit before we are ready to process topic messages (Thread/sleep (* 10 1000)) (println "-- Subscriber is now online --") (.send qSender (.createMessage qSession)) (let [done-processing (ref false)] (.setMessageListener tSubscriber (proxy [MessageListener][] (onMessage [message] (if (instance? javax.jms.TextMessage message) (println (format "Read message: %s" (.getText message))) (dosync (ref-set done- processing true)))))) (.start tConnection) (while (false? @done-processing) (Thread/sleep 1000))) (.close tConnection) (println "==Read all messages off the topic!!==")) (defn main [] (let [ctx (get-initial-context) tConFactory (.lookup ctx "TopicConnectionFactory") tConnection (.createTopicConnection tConFactory) tSession (.createTopicSession tConnection false Session/ AUTO_ACKNOWLEDGE) topic (.createTopic tSession "TestTopic") tPublisher (.createPublisher tSession topic) tSubscriber (.createSubscriber tSession topic) qConFactory (.lookup ctx "QueueConnectionFactory") qConnection (.createQueueConnection qConFactory) qSession (.createQueueSession qConnection false Session/ AUTO_ACKNOWLEDGE) queue (.createQueue qSession "controlQueue") qSender (.createSender qSession queue) qReceiver (.createReceiver qSession queue)] (.start (Thread. (fn[] (publish-n-messages-to-topic tPublisher tSession 10 qReceiver qConnection)))) (.start (Thread. (fn[] (process-topic-messages tSubscriber tConnection qSender qSession)))))) -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en To unsubscribe from this group, send email to clojure+unsubscribegooglegroups.com or reply to this email with the words "REMOVE ME" as the subject.