This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cefccaf0e56 [enhancement](plugin) logstash: add retry queue without
blocking tasks (#44999)
cefccaf0e56 is described below
commit cefccaf0e56a68bbc5f5c3a6672fe7db50085c5d
Author: Mingxi <[email protected]>
AuthorDate: Mon Feb 17 08:14:45 2025 +0800
[enhancement](plugin) logstash: add retry queue without blocking tasks
(#44999)
---
extension/logstash/lib/logstash/outputs/doris.rb | 140 +++++++++++++--------
.../logstash/lib/logstash/util/delay_event.rb | 54 ++++++++
extension/logstash/logstash-output-doris.gemspec | 2 +-
3 files changed, 143 insertions(+), 53 deletions(-)
diff --git a/extension/logstash/lib/logstash/outputs/doris.rb
b/extension/logstash/lib/logstash/outputs/doris.rb
index 21d3ee6e752..971d28889c4 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -22,6 +22,7 @@ require "logstash/outputs/base"
require "logstash/namespace"
require "logstash/json"
require 'logstash/util/formater'
+require 'logstash/util/delay_event'
require "uri"
require "securerandom"
require "json"
@@ -43,7 +44,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
config :db, :validate => :string, :required => true
# the table which data is loaded to
config :table, :validate => :string, :required => true
- # label prefix of a stream load requst.
+ # label prefix of a stream load request.
config :label_prefix, :validate => :string, :default => "logstash"
# user name
config :user, :validate => :string, :required => true
@@ -72,6 +73,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
config :log_progress_interval, :validate => :number, :default => 10
+ # max retry queue size in MB, default is 20% max memory of JVM
+ config :max_retry_queue_mb, :validate => :number, :default =>
java.lang.Runtime.get_runtime.max_memory / 1024 / 1024 / 5
def print_plugin_info()
@plugins = Gem::Specification.find_all{|spec| spec.name =~
/logstash-output-doris/ }
@@ -131,9 +134,36 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
end
end
+ if @max_retry_queue_mb <= 0
+ @max_retry_queue_mb = java.lang.Runtime.get_runtime.max_memory / 1024
/ 1024 / 5
+ end
+ @logger.info("max retry queue size: #{@max_retry_queue_mb}MB")
+
+ @retry_queue = java.util.concurrent.DelayQueue.new
+ # retry queue size in bytes
+ @retry_queue_bytes = java.util.concurrent.atomic.AtomicLong.new(0)
+ retry_thread = Thread.new do
+ while popped = @retry_queue.take
+ documents, http_headers, event_num, req_count = popped.event
+ handle_request(documents, http_headers, event_num, req_count)
+ end
+ end
+
print_plugin_info()
end # def register
+ private
+ def add_event_to_retry_queue(delay_event)
+ event_size = delay_event.documents.size
+ if delay_event.first_retry
+ while @retry_queue_bytes.get + event_size > @max_retry_queue_mb *
1024 * 1024
+ sleep(1)
+ end
+ @retry_queue_bytes.addAndGet(event_size)
+ end
+ @retry_queue.add(delay_event)
+ end
+
def multi_receive(events)
return if events.empty?
send_events(events)
@@ -141,12 +171,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
private
def send_events(events)
- documents = ""
- event_num = 0
- events.each do |event|
- documents << event_body(event) << "\n"
- event_num += 1
- end
+ documents = events.map { |event| event_body(event) }.join("\n")
+ event_num = events.size
# @logger.info("get event num: #{event_num}")
@logger.debug("get documents: #{documents}")
@@ -157,50 +183,64 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
http_headers["label"] = @label_prefix + "_" + @db + "_" + @table +
"_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
end
- req_count = 0
- sleep_for = 1
- while true
- response = make_request(documents, http_headers, @http_query,
@http_hosts.sample)
-
- req_count += 1
- response_json = {}
- begin
- response_json = JSON.parse(response.body)
- rescue => e
- @logger.warn("doris stream load response: #{response} is not a
valid JSON")
- end
+ handle_request(documents, http_headers, event_num, 1)
+ end
+
+ def sleep_for_attempt(attempt)
+ sleep_for = attempt**2
+ sleep_for = sleep_for <= 60 ? sleep_for : 60
+ (sleep_for/2) + (rand(0..sleep_for)/2)
+ end
+
+ private
+ def handle_request(documents, http_headers, event_num, req_count)
+ response = make_request(documents, http_headers, @http_query,
@http_hosts.sample)
+ response_json = {}
+ begin
+ response_json = JSON.parse(response.body)
+ rescue => _
+ @logger.warn("doris stream load response is not a valid
JSON:\n#{response}")
+ end
+
+ status = response_json["Status"]
+
+ need_retry = true
- status = response_json["Status"]
+ if status == 'Label Already Exists'
+ @logger.warn("Label already exists: #{response_json['Label']}, skip
#{event_num} records:\n#{response}")
+ need_retry = false
- if status == 'Label Already Exists'
- @logger.warn("Label already exists: #{response_json['Label']}, skip
#{event_num} records.")
- break
+ elsif status == "Success" || status == "Publish Timeout"
+ @total_bytes.addAndGet(documents.size)
+ @total_rows.addAndGet(event_num)
+ if @log_request or @logger.debug?
+ @logger.info("doris stream load response:\n#{response}")
end
+ need_retry = false
+
+ elsif @max_retries >= 0 && req_count - 1 > @max_retries
+ @logger.warn("FAILED doris stream load response:\n#{response}")
+ @logger.warn("DROP this batch after failed #{req_count} times.")
+ if @save_on_failure
+ @logger.warn("Try save to disk.Disk file path :
#{@save_dir}/#{@table}_#{@save_file}")
+ save_to_disk(documents)
+ end
+ need_retry = false
+ end
- if status == "Success" || status == "Publish Timeout"
- @total_bytes.addAndGet(documents.size)
- @total_rows.addAndGet(event_num)
- break
- else
- @logger.warn("FAILED doris stream load response:\n#{response}")
-
- if @max_retries >= 0 && req_count > @max_retries
- @logger.warn("DROP this batch after failed #{req_count} times.")
- if @save_on_failure
- @logger.warn("Try save to disk.Disk file path :
#{@save_dir}/#{@table}_#{@save_file}")
- save_to_disk(documents)
- end
- break
- end
-
- # sleep and then retry
- sleep_for = sleep_for * 2
- sleep_for = sleep_for <= 60 ? sleep_for : 60
- sleep_rand = (sleep_for / 2) + (rand(0..sleep_for) / 2)
- @logger.warn("Will do retry #{req_count} after sleep #{sleep_rand}
secs.")
- sleep(sleep_rand)
+ if !need_retry
+ if req_count > 1
+ @retry_queue_bytes.addAndGet(-documents.size)
end
+ return
end
+
+ # add to retry_queue
+ sleep_for = sleep_for_attempt(req_count)
+ @logger.warn("FAILED doris stream load response:\n#{response}")
+ @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.")
+ delay_event = DelayEvent.new(sleep_for, [documents, http_headers,
event_num, req_count+1])
+ add_event_to_retry_queue(delay_event)
end
private
@@ -227,11 +267,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
log_failure("doris stream load request error: #{e}")
end
- if @log_request or @logger.debug?
- @logger.info("doris stream load response:\n#{response}")
- end
-
- return response
+ response
end # def make_request
# Format the HTTP body
@@ -284,8 +320,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
end
# This is split into a separate method mostly to help testing
- def log_failure(message)
- @logger.warn("[Doris Output Failure] #{message}")
+ def log_failure(message, data = {})
+ @logger.warn("[Doris Output Failure] #{message}", data)
end
def make_request_headers()
diff --git a/extension/logstash/lib/logstash/util/delay_event.rb
b/extension/logstash/lib/logstash/util/delay_event.rb
new file mode 100644
index 00000000000..86f59ef457f
--- /dev/null
+++ b/extension/logstash/lib/logstash/util/delay_event.rb
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'java'
+
+class DelayEvent
+ include java.util.concurrent.Delayed
+
+ def initialize(delay, event)
+ @start_time = Time.now.to_i + delay
+ @event = event # event style: [documents, http_headers, event_num,
req_count]
+ end
+
+ def get_delay(unit)
+ delay = @start_time - Time.now.to_i
+ unit.convert(delay, java.util.concurrent.TimeUnit::SECONDS)
+ end
+
+ def compare_to(other)
+ d = self.start_time - other.start_time
+ return 0 if d == 0
+ d < 0 ? -1 : 1
+ end
+
+ def start_time
+ @start_time
+ end
+
+ def event
+ @event
+ end
+
+ def documents
+ @event[0]
+ end
+
+ def first_retry
+ @event[3] == 2
+ end
+end
diff --git a/extension/logstash/logstash-output-doris.gemspec
b/extension/logstash/logstash-output-doris.gemspec
index 30341b83156..f44d57d0511 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -18,7 +18,7 @@ under the License.
=end
Gem::Specification.new do |s|
s.name = 'logstash-output-doris'
- s.version = '1.0.1'
+ s.version = '1.1.0'
s.author = 'Apache Doris'
s.email = '[email protected]'
s.homepage = 'http://doris.apache.org'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]