rnb-tron commented on code in PR #99:
URL:
https://github.com/apache/doris-kafka-connector/pull/99#discussion_r3231955881
##########
src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java:
##########
@@ -56,17 +111,25 @@ public String getAvailableBackend() {
}
public static boolean tryHttpConnection(String backend) {
+ HttpURLConnection co = null;
try {
- backend = "http://" + backend;
- URL url = new URL(backend);
- HttpURLConnection co = (HttpURLConnection) url.openConnection();
- co.setConnectTimeout(60000);
+ URL url = new URL("http://" + backend);
+ co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(PROBE_TIMEOUT_MS);
+ co.setReadTimeout(PROBE_TIMEOUT_MS);
co.connect();
- co.disconnect();
return true;
} catch (Exception ex) {
LOG.warn("Failed to connect to backend:{}", backend, ex);
return false;
+ } finally {
Review Comment:
It's obvious that the original code impl is simpler, right?
##########
src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java:
##########
@@ -43,7 +54,51 @@ public static BackendUtils getInstance(DorisOptions
dorisOptions, Logger logger)
return new BackendUtils(RestService.getBackendsV2(dorisOptions,
logger));
}
+ /**
+ * Pick a usable backend. The previously chosen backend is reused while it
is still within the
+ * cache TTL, so the hot write path does not pay for an HTTP probe on
every call. When the cache
+ * is empty/expired we fall back to the round-robin probe behaviour.
+ */
public String getAvailableBackend() {
+ String cached = cachedBackend;
+ if (cached != null && !isCacheExpired()) {
+ return cached;
+ }
+
+ synchronized (lock) {
+ cached = cachedBackend;
+ if (cached != null && !isCacheExpired()) {
+ return cached;
+ }
+
+ String picked = pickBackendLocked();
+ cachedBackend = picked;
+ cachedAtNanos = System.nanoTime();
+ return picked;
+ }
+ }
+
+ /**
+ * Invalidate the cached backend. Callers should invoke this after a
stream load / commit
+ * failure so that the next {@link #getAvailableBackend()} probes a fresh
node instead of
+ * returning the failing one again.
+ */
+ public void invalidateCache() {
+ synchronized (lock) {
Review Comment:
There is not need to add a lock, as this will only be executed in one thread.
##########
src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java:
##########
@@ -56,17 +111,25 @@ public String getAvailableBackend() {
}
public static boolean tryHttpConnection(String backend) {
+ HttpURLConnection co = null;
try {
- backend = "http://" + backend;
- URL url = new URL(backend);
- HttpURLConnection co = (HttpURLConnection) url.openConnection();
- co.setConnectTimeout(60000);
+ URL url = new URL("http://" + backend);
+ co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(PROBE_TIMEOUT_MS);
Review Comment:
It's better to retain the existing timeout settings.
##########
src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java:
##########
@@ -43,7 +54,51 @@ public static BackendUtils getInstance(DorisOptions
dorisOptions, Logger logger)
return new BackendUtils(RestService.getBackendsV2(dorisOptions,
logger));
}
+ /**
+ * Pick a usable backend. The previously chosen backend is reused while it
is still within the
+ * cache TTL, so the hot write path does not pay for an HTTP probe on
every call. When the cache
+ * is empty/expired we fall back to the round-robin probe behaviour.
+ */
public String getAvailableBackend() {
+ String cached = cachedBackend;
+ if (cached != null && !isCacheExpired()) {
+ return cached;
+ }
+
+ synchronized (lock) {
Review Comment:
There is not need to add a lock, as this will only be executed in one thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]