This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c78d91e5259 branch-3.0: [fix](broker) BE crashed because of empty 
broker #51274 (#51783)
c78d91e5259 is described below

commit c78d91e5259c6833eda3255d27bc99a4494cd811
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 24 10:26:09 2025 +0800

    branch-3.0: [fix](broker) BE crashed because of empty broker #51274 (#51783)
    
    Cherry-picked from #51274
    
    Co-authored-by: camby <[email protected]>
---
 be/src/io/file_factory.cpp                         | 44 ++++++++++++++++++++--
 .../apache/doris/datasource/FileQueryScanNode.java | 24 +++++++-----
 2 files changed, 54 insertions(+), 14 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index db0d1c2109b..899287dbef1 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -72,14 +72,39 @@ io::FileReaderOptions 
FileFactory::get_reader_options(RuntimeState* state,
     return opts;
 }
 
+int32_t get_broker_index(const std::vector<TNetworkAddress>& brokers, const 
std::string& path) {
+    if (brokers.empty()) {
+        return -1;
+    }
+
+    // firstly find local broker
+    const auto local_host = BackendOptions::get_localhost();
+    for (int32_t i = 0; i < brokers.size(); ++i) {
+        if (brokers[i].hostname == local_host) {
+            return i;
+        }
+    }
+
+    // secondly select broker by hash of file path
+    auto key = HashUtil::hash(path.data(), path.size(), 0);
+    return key % brokers.size();
+}
+
 Result<io::FileSystemSPtr> FileFactory::create_fs(const io::FSPropertiesRef& 
fs_properties,
                                                   const io::FileDescription& 
file_description) {
     switch (fs_properties.type) {
     case TFileType::FILE_LOCAL:
         return io::global_local_filesystem();
-    case TFileType::FILE_BROKER:
-        return 
io::BrokerFileSystem::create((*fs_properties.broker_addresses)[0],
+    case TFileType::FILE_BROKER: {
+        auto index = get_broker_index(*fs_properties.broker_addresses, 
file_description.path);
+        if (index < 0) {
+            return ResultError(Status::InternalError("empty 
broker_addresses"));
+        }
+        LOG_INFO("select broker: {} for file {}", 
(*fs_properties.broker_addresses)[index].hostname,
+                 file_description.path);
+        return 
io::BrokerFileSystem::create((*fs_properties.broker_addresses)[index],
                                             *fs_properties.properties, 
io::FileSystem::TMP_FS_ID);
+    }
     case TFileType::FILE_S3: {
         S3URI s3_uri(file_description.path);
         RETURN_IF_ERROR_RESULT(s3_uri.parse());
@@ -129,7 +154,12 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer(
         return file_writer;
     }
     case TFileType::FILE_BROKER: {
-        return io::BrokerFileWriter::create(env, broker_addresses[0], 
properties, path);
+        auto index = get_broker_index(broker_addresses, path);
+        if (index < 0) {
+            return ResultError(Status::InternalError("empty 
broker_addresses"));
+        }
+        LOG_INFO("select broker: {} for file {}", 
broker_addresses[index].hostname, path);
+        return io::BrokerFileWriter::create(env, broker_addresses[index], 
properties, path);
     }
     case TFileType::FILE_S3: {
         S3URI s3_uri(path);
@@ -198,8 +228,14 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
                 });
     }
     case TFileType::FILE_BROKER: {
+        auto index = get_broker_index(system_properties.broker_addresses, 
file_description.path);
+        if (index < 0) {
+            return ResultError(Status::InternalError("empty 
broker_addresses"));
+        }
+        LOG_INFO("select broker: {} for file {}",
+                 system_properties.broker_addresses[index].hostname, 
file_description.path);
         // TODO(plat1ko): Create `FileReader` without FS
-        return 
io::BrokerFileSystem::create(system_properties.broker_addresses[0],
+        return 
io::BrokerFileSystem::create(system_properties.broker_addresses[index],
                                             system_properties.properties, 
io::FileSystem::TMP_FS_ID)
                 .and_then([&](auto&& fs) -> Result<io::FileReaderSPtr> {
                     io::FileReaderSPtr file_reader;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index dedbedeafa3..04adce89bb7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -75,6 +75,7 @@ import org.apache.logging.log4j.Logger;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -467,21 +468,24 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 params.setProperties(locationProperties);
 
                 if (!params.isSetBrokerAddresses()) {
-                    FsBroker broker;
+                    List<FsBroker> brokers;
                     if (brokerName != null) {
-                        broker = 
Env.getCurrentEnv().getBrokerMgr().getBroker(brokerName, 
selectedBackend.getHost());
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(String.format(
-                                    "Set location for broker [%s], selected BE 
host: [%s] selected broker host: [%s]",
-                                    brokerName, selectedBackend.getHost(), 
broker.host));
-                        }
+                        brokers = 
Env.getCurrentEnv().getBrokerMgr().getBrokers(brokerName);
                     } else {
-                        broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+                        brokers = 
Env.getCurrentEnv().getBrokerMgr().getAllBrokers();
+                    }
+                    if (brokers == null || brokers.isEmpty()) {
+                        throw new UserException("No alive broker.");
+                    }
+                    Collections.shuffle(brokers);
+                    for (FsBroker broker : brokers) {
+                        if (broker.isAlive) {
+                            params.addToBrokerAddresses(new 
TNetworkAddress(broker.host, broker.port));
+                        }
                     }
-                    if (broker == null) {
+                    if (params.getBrokerAddresses().isEmpty()) {
                         throw new UserException("No alive broker.");
                     }
-                    params.addToBrokerAddresses(new 
TNetworkAddress(broker.host, broker.port));
                 }
             }
         } else if ((locationType == TFileType.FILE_S3 || locationType == 
TFileType.FILE_LOCAL)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to