This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c78d91e5259 branch-3.0: [fix](broker) BE crashed because of empty
broker #51274 (#51783)
c78d91e5259 is described below
commit c78d91e5259c6833eda3255d27bc99a4494cd811
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 24 10:26:09 2025 +0800
branch-3.0: [fix](broker) BE crashed because of empty broker #51274 (#51783)
Cherry-picked from #51274
Co-authored-by: camby <[email protected]>
---
be/src/io/file_factory.cpp | 44 ++++++++++++++++++++--
.../apache/doris/datasource/FileQueryScanNode.java | 24 +++++++-----
2 files changed, 54 insertions(+), 14 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index db0d1c2109b..899287dbef1 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -72,14 +72,39 @@ io::FileReaderOptions
FileFactory::get_reader_options(RuntimeState* state,
return opts;
}
+int32_t get_broker_index(const std::vector<TNetworkAddress>& brokers, const
std::string& path) {
+ if (brokers.empty()) {
+ return -1;
+ }
+
+ // firstly find local broker
+ const auto local_host = BackendOptions::get_localhost();
+ for (int32_t i = 0; i < brokers.size(); ++i) {
+ if (brokers[i].hostname == local_host) {
+ return i;
+ }
+ }
+
+ // secondly select broker by hash of file path
+ auto key = HashUtil::hash(path.data(), path.size(), 0);
+ return key % brokers.size();
+}
+
Result<io::FileSystemSPtr> FileFactory::create_fs(const io::FSPropertiesRef&
fs_properties,
const io::FileDescription&
file_description) {
switch (fs_properties.type) {
case TFileType::FILE_LOCAL:
return io::global_local_filesystem();
- case TFileType::FILE_BROKER:
- return
io::BrokerFileSystem::create((*fs_properties.broker_addresses)[0],
+ case TFileType::FILE_BROKER: {
+ auto index = get_broker_index(*fs_properties.broker_addresses,
file_description.path);
+ if (index < 0) {
+ return ResultError(Status::InternalError("empty
broker_addresses"));
+ }
+ LOG_INFO("select broker: {} for file {}",
(*fs_properties.broker_addresses)[index].hostname,
+ file_description.path);
+ return
io::BrokerFileSystem::create((*fs_properties.broker_addresses)[index],
*fs_properties.properties,
io::FileSystem::TMP_FS_ID);
+ }
case TFileType::FILE_S3: {
S3URI s3_uri(file_description.path);
RETURN_IF_ERROR_RESULT(s3_uri.parse());
@@ -129,7 +154,12 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer(
return file_writer;
}
case TFileType::FILE_BROKER: {
- return io::BrokerFileWriter::create(env, broker_addresses[0],
properties, path);
+ auto index = get_broker_index(broker_addresses, path);
+ if (index < 0) {
+ return ResultError(Status::InternalError("empty
broker_addresses"));
+ }
+ LOG_INFO("select broker: {} for file {}",
broker_addresses[index].hostname, path);
+ return io::BrokerFileWriter::create(env, broker_addresses[index],
properties, path);
}
case TFileType::FILE_S3: {
S3URI s3_uri(path);
@@ -198,8 +228,14 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
});
}
case TFileType::FILE_BROKER: {
+ auto index = get_broker_index(system_properties.broker_addresses,
file_description.path);
+ if (index < 0) {
+ return ResultError(Status::InternalError("empty
broker_addresses"));
+ }
+ LOG_INFO("select broker: {} for file {}",
+ system_properties.broker_addresses[index].hostname,
file_description.path);
// TODO(plat1ko): Create `FileReader` without FS
- return
io::BrokerFileSystem::create(system_properties.broker_addresses[0],
+ return
io::BrokerFileSystem::create(system_properties.broker_addresses[index],
system_properties.properties,
io::FileSystem::TMP_FS_ID)
.and_then([&](auto&& fs) -> Result<io::FileReaderSPtr> {
io::FileReaderSPtr file_reader;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index dedbedeafa3..04adce89bb7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -75,6 +75,7 @@ import org.apache.logging.log4j.Logger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -467,21 +468,24 @@ public abstract class FileQueryScanNode extends
FileScanNode {
params.setProperties(locationProperties);
if (!params.isSetBrokerAddresses()) {
- FsBroker broker;
+ List<FsBroker> brokers;
if (brokerName != null) {
- broker =
Env.getCurrentEnv().getBrokerMgr().getBroker(brokerName,
selectedBackend.getHost());
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Set location for broker [%s], selected BE
host: [%s] selected broker host: [%s]",
- brokerName, selectedBackend.getHost(),
broker.host));
- }
+ brokers =
Env.getCurrentEnv().getBrokerMgr().getBrokers(brokerName);
} else {
- broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+ brokers =
Env.getCurrentEnv().getBrokerMgr().getAllBrokers();
+ }
+ if (brokers == null || brokers.isEmpty()) {
+ throw new UserException("No alive broker.");
+ }
+ Collections.shuffle(brokers);
+ for (FsBroker broker : brokers) {
+ if (broker.isAlive) {
+ params.addToBrokerAddresses(new
TNetworkAddress(broker.host, broker.port));
+ }
}
- if (broker == null) {
+ if (params.getBrokerAddresses().isEmpty()) {
throw new UserException("No alive broker.");
}
- params.addToBrokerAddresses(new
TNetworkAddress(broker.host, broker.port));
}
}
} else if ((locationType == TFileType.FILE_S3 || locationType ==
TFileType.FILE_LOCAL)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]