This problem has become much stranger. Really, I thought I was developing 
some intuitions about how to write Clojure code, but everything about this 
seems counter-intuitive. 

When the app starts, it seems to be broken, doing one write per second to 
ElasticSearch (on AWS). Then I get an error, and the error seems to fix the 
app, which then starts to make about 15,000 writes a minute to 
ElasticSearch. The error is a retry attempt reported from either Apache 
Commons HTTP or from Elastish, which is the library I'm using to connect to 
ElasticSearch.

I am not sure that anyone can help me with this, but I'll share some 
details in case anyone has run into this before. 

I'm moving 1.2 million records from MySQL to ElasticSearch (it's actually 4 
million documents that aggregate down to 1.2 million). I'm using 
DurableQueue to move the records from one transformation to another, 
denormalizing the data into the format that the frontend needs. I use 3 
queues to move through our 3 transformations. I've also a function that 
runs in the background and every 30 seconds it prints out some facts about 
my app, to give me a rough idea of what is going on. I use the (stats) 
function provided by DurableQueue to see how the 3 queues are doing.  So 
after 4 or 5 minutes, I see this block of information: 

Resource usage:  Memory in use (percentage/used/max-heap): ("87%" "3131M" 
"3568M")

CPU usage (how-many-cpu's/load-average):  [4 3.88]

Free memory in jvm: [444474424])

Stats about from-mysql-to-tables-queue:  {message {:num-slabs 1, 
:num-active-slabs 1, :enqueued 236358, :retried 0, :completed 236358, 
:in-progress -1}})
}

Stats about from-tables-to-topics-queue: {:num-slabs 1, :num-active-slabs 
1, :enqueued 45731, :retried  0, :completed 45729, :in-progress 1}}
 
Stats about the from-topics-to-persistence-queue:  {message {:num-slabs 1, 
:num-active-slabs 1, :enqueued 766, :retried 0, :completed 737, 
:in-progress 20}})

This final queue is moving at a glacial speed until I get these errors: 


Oct 04, 2017 10:27:35 PM org.apache.http.impl.client.DefaultHttpClient 
tryConnect
INFO: Retrying connect to 
{s}->https://search-samedayes01-ntsdht7rqwesdu.us-east-1.es.amazonaws.com:443

Oct 04, 2017 10:27:35 PM org.apache.http.impl.client.DefaultHttpClient 
tryConnect
INFO: I/O exception (java.net.SocketException) caught when connecting to 
{s}->https://search-samedayes01-ntsdht7rqwesdu.us-east-1.es.amazonaws.com:443: 
Broken pipe (Write failed)

Oct 04, 2017 10:27:35 PM org.apache.http.impl.client.DefaultHttpClient 
tryConnect
INFO: Retrying connect to 
{s}->https://search-samedayes01-ntsdht7rqwesdu.us-east-1.es.amazonaws.com:443

Strangely, these errors seem to fix my app. After I get these errors, the 
app starts writing to ElasticSearch at about 15,000 records per minute. 

A minute or two later I see:

Resource usage:  Memory in use (percentage/used/max-heap): ("62%" "2245M" 
"3568M")

CPU usage (how-many-cpu's/load-average):  [4 1.32]

Free memory in jvm: [611550400])

Stats about from-mysql-to-tables-queue:  {message {:num-slabs 1, 
:num-active-slabs 1, :enqueued 1156526, :retried 0, :completed 1156526, 
:in-progress 0}})

Stats about from-tables-to-topics-queue:  {message {:num-slabs 1, 
:num-active-slabs 1, :enqueued 59928, :retried 0, :completed 59928, 
:in-progress 0}})

Stats about the from-topics-to-persistence-queue:  {message {:num-slabs 1, 
:num-active-slabs 1, :enqueued 53222, :retried 0, :completed 53192, 
:in-progress 20}})


So memory use declines and the speed increases. 

It is possible that the java.net.SocketException is a red herring. It is 
possible that it shows up just when the app is done with most of the 
transformations, so of course memory use would go down and speed would go 
up. But I've run this several times and java.net.SocketException always 
shows up at the same time as the big increase in speed. 

I did some research about this error and ElasticSearch and I discovered 
this:

https://stackoverflow.com/questions/28908835/ssl-peer-shut-down-incorrectly-in-java

So I added this to the -main function that starts the app:

   (System/setProperty "https.protocols" "TLSv1.1")
   
This didn't seem to have any effect. 

Going out on a limb, it does seem that all the threads that write to 
ElasticSearch initially end up blocked, and then they timeout, and when 
they retry then things go well, so the app doesn't perform well until those 
threads writing to ElasticSearch have timedout and retried. But why the 
timeout should be necessary, and why the retry works so well (repeatedly, 
every time I do this) is beyond me. 

The code that spins up the worker threads that write to ElasticSearch looks 
like this: 

(defn advance
  [message db]
  {:pre [
         (= (type message) durable_queue.Task)
         ]}
  (persistence/push-item-to-persistence @message db)
  (durable/complete! message))


(defn worker
  [from-topics-to-persistence-queue db]
  (slingshot/try+
   (loop [message (durable/take! from-topics-to-persistence-queue :message 
60000 :timed-out!)]
     (slingshot/try+ 
      (if (= (type message) durable_queue.Task)
        (advance message db))
      (catch Object o
        (slingshot/throw+ {
                           :type worker
                           :error o
                           :from-topics-to-persistence-queue 
from-topics-to-persistence-queue
                           :message @message
                           :db db
                           }
                          )))
     (recur (durable/take! from-topics-to-persistence-queue :message 60000 
:timed-out!)))   
   (catch Object o
     (errors/error o)
     (slingshot/throw+ {
                        :type worker
                        :error o
                        :from-topics-to-persistence-queue 
from-topics-to-persistence-queue
                        :db db
                        }
                       ))))

(defn start
  [config from-topics-to-persistence-queue]
  {:pre [(not (nil? from-topics-to-persistence-queue))]}
  ;; 2017-09-22 -- please note, it is common to get :timed-out
  (dotimes [_ 20]
    (future
      (slingshot/try+ 
       (errors/log  " push/start create workers for 
from-topics-to-persistence-queue")
       (let [db (persistence/multi-thread-start config)]
         (worker from-topics-to-persistence-queue db))
       (catch Object o
         (errors/error o)
         (slingshot/throw+ {
                            :type ::start
                            :error o
                            :config config
                            :from-topics-to-persistence-queue 
from-topics-to-persistence-queue
                            }
                           ))))))

Any thoughts about why I would get this retry error? Has anyone else gotten 
that from AWS ElasticSearch? 

I am feeling fairly stupid at the moment. The app has only about 950 lines 
of code, yet I've managed to get a lot wrong. The last time I ran this app 
it managed to write 499,000 records to ElasticSearch, before it started 
repeatedly writing the same documents to ElasticSearch, instead of the 
remainder of the 1.2 million records. I thought the FIFO nature of 
DurableQueue would ensure that the initial write of the 1.2 million records 
would finish, before the app started writing some of the same documents 
over again, but apparently I need to go back and look at that again. Still, 
I assume that error is separate from the SocketTimeout error. 








On Wednesday, October 4, 2017 at 12:49:12 AM UTC-4, lawrence...@gmail.com 
wrote:
>
> This is probably a stupid question, but is there an obvious way to get an 
> error message out of Elastisch? I had an app that was working with MongoDB 
> and I was then told I had to use ElasticSearch instead (something about 
> only using AWS for everything) so now I'm trying to get an error message, 
> because my code doesn't seem to work. I read through here without seeing 
> anything obvious: 
>
> http://clojureelasticsearch.info/articles/getting_started.htm
>
> I rewrote my MongoDB function, so it should work with Elastisch: 
>
> (defn push-item-to-persistence
>   [item db]
>   (let [
>         denormalized-id (get-in item [:denormalized-id] :no-id)
>         item (assoc item :updated-at (temporal/current-time-as-datetime))
>         item (assoc item :permanent-holding-id-for-item-instances 
> (java.util.UUID/randomUUID))
>         item (assoc item :instance-id-for-this-one-item 
> (java.util.UUID/randomUUID))
>         item (assoc item :item-type :deduplication)
>         ]
>     (if (= denormalized-id :no-id)
>       (slingshot/throw+ {
>                          :type 
> ::no-denormalized-id-in-push-item-into-database
>                          :item item
>                          })
>       (slingshot/try+
>        (put conn index mapping-type id document)
>        (esd/put db "facts-over-time" "deduplicaton" (str denormalized-id) 
> item)
>        (println " done with put in push-item-to-persistence ")
>        (catch Object o
>          (slingshot/throw+ {
>                             :type ::push-item-to-persistence
>                             :error o
>                             :item item
>                             :db db
>                             }
>                            ))))))
>
>
> It doesn't seem that any documents are getting into ElasticSearch. I was 
> hoping to the (throw) would reveal to me some useful debugging information, 
> but that doesn't seem to happen. 
>
> The connection appears to be valid, as I can do this:
>
>                  (let [conn (persistence/multi-thread-start config)
>                        res  (esd/search conn "facts-over-time" 
> "deduplication" :query {:match_all {}})
>                        n    (esrsp/total-hits res)
>                        hits (esrsp/hits-from res)]
>                    (clojure.pprint/pprint res))
>
> and I get: 
>
> {:took 1,
>  :timed_out false,
>  :_shards {:total 5, :successful 5, :failed 0},
>  :hits {:total 0, :max_score nil, :hits []}}
>
> So the connection is there. But no records are. 
>
>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to