This problem has become much stranger. Really, I thought I was developing some intuitions about how to write Clojure code, but everything about this seems counter-intuitive.
When the app starts, it seems to be broken, doing one write per second to ElasticSearch (on AWS). Then I get an error, and the error seems to fix the app, which then starts to make about 15,000 writes a minute to ElasticSearch. The error is a retry attempt reported from either Apache Commons HTTP or from Elastish, which is the library I'm using to connect to ElasticSearch. I am not sure that anyone can help me with this, but I'll share some details in case anyone has run into this before. I'm moving 1.2 million records from MySQL to ElasticSearch (it's actually 4 million documents that aggregate down to 1.2 million). I'm using DurableQueue to move the records from one transformation to another, denormalizing the data into the format that the frontend needs. I use 3 queues to move through our 3 transformations. I've also a function that runs in the background and every 30 seconds it prints out some facts about my app, to give me a rough idea of what is going on. I use the (stats) function provided by DurableQueue to see how the 3 queues are doing. So after 4 or 5 minutes, I see this block of information: Resource usage: Memory in use (percentage/used/max-heap): ("87%" "3131M" "3568M") CPU usage (how-many-cpu's/load-average): [4 3.88] Free memory in jvm: [444474424]) Stats about from-mysql-to-tables-queue: {message {:num-slabs 1, :num-active-slabs 1, :enqueued 236358, :retried 0, :completed 236358, :in-progress -1}}) } Stats about from-tables-to-topics-queue: {:num-slabs 1, :num-active-slabs 1, :enqueued 45731, :retried 0, :completed 45729, :in-progress 1}} Stats about the from-topics-to-persistence-queue: {message {:num-slabs 1, :num-active-slabs 1, :enqueued 766, :retried 0, :completed 737, :in-progress 20}}) This final queue is moving at a glacial speed until I get these errors: Oct 04, 2017 10:27:35 PM org.apache.http.impl.client.DefaultHttpClient tryConnect INFO: Retrying connect to {s}->https://search-samedayes01-ntsdht7rqwesdu.us-east-1.es.amazonaws.com:443 Oct 04, 2017 10:27:35 PM org.apache.http.impl.client.DefaultHttpClient tryConnect INFO: I/O exception (java.net.SocketException) caught when connecting to {s}->https://search-samedayes01-ntsdht7rqwesdu.us-east-1.es.amazonaws.com:443: Broken pipe (Write failed) Oct 04, 2017 10:27:35 PM org.apache.http.impl.client.DefaultHttpClient tryConnect INFO: Retrying connect to {s}->https://search-samedayes01-ntsdht7rqwesdu.us-east-1.es.amazonaws.com:443 Strangely, these errors seem to fix my app. After I get these errors, the app starts writing to ElasticSearch at about 15,000 records per minute. A minute or two later I see: Resource usage: Memory in use (percentage/used/max-heap): ("62%" "2245M" "3568M") CPU usage (how-many-cpu's/load-average): [4 1.32] Free memory in jvm: [611550400]) Stats about from-mysql-to-tables-queue: {message {:num-slabs 1, :num-active-slabs 1, :enqueued 1156526, :retried 0, :completed 1156526, :in-progress 0}}) Stats about from-tables-to-topics-queue: {message {:num-slabs 1, :num-active-slabs 1, :enqueued 59928, :retried 0, :completed 59928, :in-progress 0}}) Stats about the from-topics-to-persistence-queue: {message {:num-slabs 1, :num-active-slabs 1, :enqueued 53222, :retried 0, :completed 53192, :in-progress 20}}) So memory use declines and the speed increases. It is possible that the java.net.SocketException is a red herring. It is possible that it shows up just when the app is done with most of the transformations, so of course memory use would go down and speed would go up. But I've run this several times and java.net.SocketException always shows up at the same time as the big increase in speed. I did some research about this error and ElasticSearch and I discovered this: https://stackoverflow.com/questions/28908835/ssl-peer-shut-down-incorrectly-in-java So I added this to the -main function that starts the app: (System/setProperty "https.protocols" "TLSv1.1") This didn't seem to have any effect. Going out on a limb, it does seem that all the threads that write to ElasticSearch initially end up blocked, and then they timeout, and when they retry then things go well, so the app doesn't perform well until those threads writing to ElasticSearch have timedout and retried. But why the timeout should be necessary, and why the retry works so well (repeatedly, every time I do this) is beyond me. The code that spins up the worker threads that write to ElasticSearch looks like this: (defn advance [message db] {:pre [ (= (type message) durable_queue.Task) ]} (persistence/push-item-to-persistence @message db) (durable/complete! message)) (defn worker [from-topics-to-persistence-queue db] (slingshot/try+ (loop [message (durable/take! from-topics-to-persistence-queue :message 60000 :timed-out!)] (slingshot/try+ (if (= (type message) durable_queue.Task) (advance message db)) (catch Object o (slingshot/throw+ { :type worker :error o :from-topics-to-persistence-queue from-topics-to-persistence-queue :message @message :db db } ))) (recur (durable/take! from-topics-to-persistence-queue :message 60000 :timed-out!))) (catch Object o (errors/error o) (slingshot/throw+ { :type worker :error o :from-topics-to-persistence-queue from-topics-to-persistence-queue :db db } )))) (defn start [config from-topics-to-persistence-queue] {:pre [(not (nil? from-topics-to-persistence-queue))]} ;; 2017-09-22 -- please note, it is common to get :timed-out (dotimes [_ 20] (future (slingshot/try+ (errors/log " push/start create workers for from-topics-to-persistence-queue") (let [db (persistence/multi-thread-start config)] (worker from-topics-to-persistence-queue db)) (catch Object o (errors/error o) (slingshot/throw+ { :type ::start :error o :config config :from-topics-to-persistence-queue from-topics-to-persistence-queue } )))))) Any thoughts about why I would get this retry error? Has anyone else gotten that from AWS ElasticSearch? I am feeling fairly stupid at the moment. The app has only about 950 lines of code, yet I've managed to get a lot wrong. The last time I ran this app it managed to write 499,000 records to ElasticSearch, before it started repeatedly writing the same documents to ElasticSearch, instead of the remainder of the 1.2 million records. I thought the FIFO nature of DurableQueue would ensure that the initial write of the 1.2 million records would finish, before the app started writing some of the same documents over again, but apparently I need to go back and look at that again. Still, I assume that error is separate from the SocketTimeout error. On Wednesday, October 4, 2017 at 12:49:12 AM UTC-4, lawrence...@gmail.com wrote: > > This is probably a stupid question, but is there an obvious way to get an > error message out of Elastisch? I had an app that was working with MongoDB > and I was then told I had to use ElasticSearch instead (something about > only using AWS for everything) so now I'm trying to get an error message, > because my code doesn't seem to work. I read through here without seeing > anything obvious: > > http://clojureelasticsearch.info/articles/getting_started.htm > > I rewrote my MongoDB function, so it should work with Elastisch: > > (defn push-item-to-persistence > [item db] > (let [ > denormalized-id (get-in item [:denormalized-id] :no-id) > item (assoc item :updated-at (temporal/current-time-as-datetime)) > item (assoc item :permanent-holding-id-for-item-instances > (java.util.UUID/randomUUID)) > item (assoc item :instance-id-for-this-one-item > (java.util.UUID/randomUUID)) > item (assoc item :item-type :deduplication) > ] > (if (= denormalized-id :no-id) > (slingshot/throw+ { > :type > ::no-denormalized-id-in-push-item-into-database > :item item > }) > (slingshot/try+ > (put conn index mapping-type id document) > (esd/put db "facts-over-time" "deduplicaton" (str denormalized-id) > item) > (println " done with put in push-item-to-persistence ") > (catch Object o > (slingshot/throw+ { > :type ::push-item-to-persistence > :error o > :item item > :db db > } > )))))) > > > It doesn't seem that any documents are getting into ElasticSearch. I was > hoping to the (throw) would reveal to me some useful debugging information, > but that doesn't seem to happen. > > The connection appears to be valid, as I can do this: > > (let [conn (persistence/multi-thread-start config) > res (esd/search conn "facts-over-time" > "deduplication" :query {:match_all {}}) > n (esrsp/total-hits res) > hits (esrsp/hits-from res)] > (clojure.pprint/pprint res)) > > and I get: > > {:took 1, > :timed_out false, > :_shards {:total 5, :successful 5, :failed 0}, > :hits {:total 0, :max_score nil, :hits []}} > > So the connection is there. But no records are. > > > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.