This is the follow-up code I wrote to help me understand JMS-Topic
(javax.jms.Topic).

Regards,
John

(ns jms.jms-topic
  (:use [jms.jms-test :only (get-initial-context
                              get-message-text)])
  (:import (javax.jms Session MessageListener)))

(defn publish-term-message-to-topic [tPublisher tSession]
  (.publish tPublisher (.createMessage tSession)))

(defn publish-message-to-topic [tPublisher tSession]
  (let [message (.createTextMessage tSession)]
    (.setText message (get-message-text))
    (println (format "Publishing message - %s ..." (.getText
message)))
    (.publish tPublisher message)))

(defn publish-n-messages-to-topic [tPublisher tSession num-messages
qReceiver qConnection]
  ;; Wait for subscribers
  (println "Waiting for subscribers...")
  (let [subscriber-available (ref false)]
    (.setMessageListener qReceiver
                         (proxy [MessageListener][]
                           (onMessage [message]
                                      (dosync (ref-set subscriber-
available true)))))
    (.start qConnection)
    (while (false? @subscriber-available)
           (Thread/sleep 1000))
    (.close qConnection)
    (println "Hoo-rah!! We have a subscriber"))

  (loop [n num-messages]
    (if (zero? n)
      (do
        (publish-term-message-to-topic tPublisher tSession)
        (println "*Done publishing messages to topic!*"))
      (recur (do
               (publish-message-to-topic tPublisher tSession)
               (dec n))))))

(defn process-topic-messages [tSubscriber tConnection qSender
qSession]
  ;; Snooze a bit before we are ready to process topic messages
  (Thread/sleep (* 10 1000))
  (println "-- Subscriber is now online --")
  (.send qSender (.createMessage qSession))

  (let [done-processing (ref false)]
    (.setMessageListener tSubscriber
                         (proxy [MessageListener][]
                           (onMessage [message]
                                      (if (instance?
javax.jms.TextMessage message)
                                        (println (format "Read
message: %s" (.getText message)))
                                        (dosync (ref-set done-
processing true))))))
    (.start tConnection)
    (while (false? @done-processing)
           (Thread/sleep 1000)))
  (.close tConnection)
  (println "==Read all messages off the topic!!=="))

(defn main []
  (let [ctx (get-initial-context)
        tConFactory (.lookup ctx "TopicConnectionFactory")
        tConnection (.createTopicConnection tConFactory)
        tSession (.createTopicSession tConnection false Session/
AUTO_ACKNOWLEDGE)
        topic (.createTopic tSession "TestTopic")
        tPublisher (.createPublisher tSession topic)
        tSubscriber (.createSubscriber tSession topic)
        qConFactory (.lookup ctx "QueueConnectionFactory")
        qConnection (.createQueueConnection qConFactory)
        qSession (.createQueueSession qConnection false Session/
AUTO_ACKNOWLEDGE)
        queue (.createQueue qSession "controlQueue")
        qSender (.createSender qSession queue)
        qReceiver (.createReceiver qSession queue)]
    (.start (Thread. (fn[] (publish-n-messages-to-topic tPublisher
tSession 10  qReceiver qConnection))))
    (.start (Thread. (fn[] (process-topic-messages tSubscriber
tConnection  qSender qSession))))))

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en

To unsubscribe from this group, send email to 
clojure+unsubscribegooglegroups.com or reply to this email with the words 
"REMOVE ME" as the subject.

Reply via email to