This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 62940bcd51b branch-3.1: [Fix](multicatelog) Fix insert iceberg/hive
table when use broker #51187 (#51961)
62940bcd51b is described below
commit 62940bcd51bdffa3ab34580ae212a1a5b6cade0e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 20 17:12:19 2025 +0800
branch-3.1: [Fix](multicatelog) Fix insert iceberg/hive table when use
broker #51187 (#51961)
Cherry-picked from #51187
Co-authored-by: Lijia Liu <[email protected]>
Co-authored-by: liutang123 <[email protected]>
---
.../writer/iceberg/viceberg_partition_writer.cpp | 3 ++
.../writer/iceberg/viceberg_partition_writer.h | 1 +
.../sink/writer/iceberg/viceberg_table_writer.cpp | 12 ++++--
be/src/vec/sink/writer/vhive_partition_writer.cpp | 3 ++
be/src/vec/sink/writer/vhive_partition_writer.h | 1 +
be/src/vec/sink/writer/vhive_table_writer.cpp | 46 ++++++++++++++++------
.../java/org/apache/doris/catalog/BrokerMgr.java | 15 +++++++
.../doris/planner/BaseExternalTableDataSink.java | 21 ++++++++++
.../org/apache/doris/planner/HiveTableSink.java | 3 ++
.../org/apache/doris/planner/IcebergTableSink.java | 7 +++-
gensrc/thrift/DataSinks.thrift | 2 +
11 files changed, 98 insertions(+), 16 deletions(-)
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 8963a129eee..aeaa81d9995 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -52,6 +52,9 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
+ if (!_write_info.broker_addresses.empty()) {
+ fs_properties.broker_addresses = &(_write_info.broker_addresses);
+ }
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path,
_get_target_file_name()),
.fs_name {}};
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
index b66dea1d6d2..28605f80426 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
@@ -50,6 +50,7 @@ public:
std::string original_write_path;
std::string target_path;
TFileType::type file_type;
+ std::vector<TNetworkAddress> broker_addresses;
};
VIcebergPartitionWriter(const TDataSink& t_sink, std::vector<std::string>
partition_values,
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 280cf8b8107..26e3ab2858f 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -366,9 +366,15 @@ std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_
write_path = output_path;
}
- VIcebergPartitionWriter::WriteInfo write_info = {
- std::move(write_path), std::move(original_write_path),
std::move(target_path),
- iceberg_table_sink.file_type};
+ VIcebergPartitionWriter::WriteInfo write_info = {std::move(write_path),
+
std::move(original_write_path),
+ std::move(target_path),
+
iceberg_table_sink.file_type,
+ {}};
+ if (iceberg_table_sink.__isset.broker_addresses) {
+
write_info.broker_addresses.assign(iceberg_table_sink.broker_addresses.begin(),
+
iceberg_table_sink.broker_addresses.end());
+ }
_write_file_count++;
std::vector<std::string> column_names;
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 8a5e2a9777e..b93303dff03 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -58,6 +58,9 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
+ if (!_write_info.broker_addresses.empty()) {
+ fs_properties.broker_addresses = &(_write_info.broker_addresses);
+ }
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path,
_get_target_file_name()),
.fs_name {}};
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h
b/be/src/vec/sink/writer/vhive_partition_writer.h
index 71e1e141d9c..20ca6506704 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.h
+++ b/be/src/vec/sink/writer/vhive_partition_writer.h
@@ -46,6 +46,7 @@ public:
std::string original_write_path;
std::string target_path;
TFileType::type file_type;
+ std::vector<TNetworkAddress> broker_addresses;
};
VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 6eb478c01b7..9c2726fa652 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -301,30 +301,42 @@ std::shared_ptr<VHivePartitionWriter>
VHiveTableWriter::_create_partition_writer
if (existing_table == false) { // new table
update_mode = TUpdateMode::NEW;
if (_partition_columns_input_index.empty()) { // new unpartitioned
table
- write_info = {write_location.write_path,
write_location.original_write_path,
- write_location.target_path,
write_location.file_type};
+ write_info = {write_location.write_path,
+ write_location.original_write_path,
+ write_location.target_path,
+ write_location.file_type,
+ {}};
} else { // a new partition in a new partitioned table
auto write_path = fmt::format("{}/{}",
write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}",
write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
- write_info = {std::move(write_path),
std::move(original_write_path),
- std::move(target_path),
write_location.file_type};
+ write_info = {std::move(write_path),
+ std::move(original_write_path),
+ std::move(target_path),
+ write_location.file_type,
+ {}};
}
} else { // a new partition in an existing partitioned table, or an
existing unpartitioned table
if (_partition_columns_input_index.empty()) { // an existing
unpartitioned table
update_mode =
!hive_table_sink.overwrite ? TUpdateMode::APPEND :
TUpdateMode::OVERWRITE;
- write_info = {write_location.write_path,
write_location.original_write_path,
- write_location.target_path,
write_location.file_type};
+ write_info = {write_location.write_path,
+ write_location.original_write_path,
+ write_location.target_path,
+ write_location.file_type,
+ {}};
} else { // a new partition in an existing partitioned table
update_mode = TUpdateMode::NEW;
auto write_path = fmt::format("{}/{}",
write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}",
write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
- write_info = {std::move(write_path),
std::move(original_write_path),
- std::move(target_path),
write_location.file_type};
+ write_info = {std::move(write_path),
+ std::move(original_write_path),
+ std::move(target_path),
+ write_location.file_type,
+ {}};
}
// need to get schema from existing table ?
}
@@ -337,8 +349,11 @@ std::shared_ptr<VHivePartitionWriter>
VHiveTableWriter::_create_partition_writer
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path,
partition_name);
auto target_path = fmt::format("{}",
existing_partition->location.target_path);
- write_info = {std::move(write_path),
std::move(original_write_path),
- std::move(target_path),
existing_partition->location.file_type};
+ write_info = {std::move(write_path),
+ std::move(original_write_path),
+ std::move(target_path),
+ existing_partition->location.file_type,
+ {}};
file_format_type = existing_partition->file_format;
write_compress_type = hive_table_sink.compression_type;
} else {
@@ -347,13 +362,20 @@ std::shared_ptr<VHivePartitionWriter>
VHiveTableWriter::_create_partition_writer
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path,
partition_name);
auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
- write_info = {std::move(write_path),
std::move(original_write_path),
- std::move(target_path), write_location.file_type};
+ write_info = {std::move(write_path),
+ std::move(original_write_path),
+ std::move(target_path),
+ write_location.file_type,
+ {}};
file_format_type = hive_table_sink.file_format;
write_compress_type = hive_table_sink.compression_type;
// need to get schema from existing table ?
}
}
+ if (hive_table_sink.__isset.broker_addresses) {
+
write_info.broker_addresses.assign(hive_table_sink.broker_addresses.begin(),
+
hive_table_sink.broker_addresses.end());
+ }
_write_file_count++;
std::vector<std::string> column_names;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
index 5e4e7c7cbc8..6b5a519eeda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
@@ -112,6 +112,21 @@ public class BrokerMgr {
}
}
+ public List<FsBroker> getBrokers(String brokerName) {
+ List<FsBroker> result = null;
+ lock.lock();
+ try {
+ List<FsBroker> brokerList = brokerListMap.get(brokerName);
+ if (brokerList == null || brokerList.isEmpty()) {
+ return null;
+ }
+ result = new ArrayList<>(brokerList);
+ } finally {
+ lock.unlock();
+ }
+ return result;
+ }
+
public FsBroker getAnyBroker(String brokerName) {
lock.lock();
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java
index 96b1c7e7d39..433162f93fe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java
@@ -20,14 +20,20 @@
package org.apache.doris.planner;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TNetworkAddress;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
public abstract class BaseExternalTableDataSink extends DataSink {
@@ -53,6 +59,21 @@ public abstract class BaseExternalTableDataSink extends
DataSink {
*/
protected abstract Set<TFileFormatType> supportedFileFormatTypes();
+ protected List<TNetworkAddress> getBrokerAddresses(String bindBroker)
throws AnalysisException {
+ List<FsBroker> brokers;
+ if (bindBroker != null) {
+ brokers =
Env.getCurrentEnv().getBrokerMgr().getBrokers(bindBroker);
+ } else {
+ brokers = Env.getCurrentEnv().getBrokerMgr().getAllBrokers();
+ }
+ if (brokers == null || brokers.isEmpty()) {
+ throw new AnalysisException("No alive broker.");
+ }
+ Collections.shuffle(brokers);
+ return brokers.stream().map(broker -> new TNetworkAddress(broker.host,
broker.port))
+ .collect(Collectors.toList());
+ }
+
protected TFileFormatType getTFileFormatType(String format) throws
AnalysisException {
TFileFormatType fileFormatType = TFileFormatType.FORMAT_UNKNOWN;
String lowerCase = format.toLowerCase();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index a4012d357e5..bb4786f226a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -150,6 +150,9 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
}
locationParams.setFileType(fileType);
tSink.setLocation(locationParams);
+ if (fileType.equals(TFileType.FILE_BROKER)) {
+
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
+ }
tSink.setHadoopConfig(targetTable.getHadoopProperties());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index bfacb572305..5bc0c803cb9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -27,6 +27,7 @@ import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergTableSink;
import org.apache.doris.thrift.TSortField;
@@ -134,7 +135,11 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
LocationPath locationPath = new
LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps);
tSink.setOutputPath(locationPath.toStorageLocation().toString());
tSink.setOriginalOutputPath(locationPath.getPath().toString());
- tSink.setFileType(locationPath.getTFileTypeForBE());
+ TFileType fileType = locationPath.getTFileTypeForBE();
+ tSink.setFileType(fileType);
+ if (fileType.equals(TFileType.FILE_BROKER)) {
+
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
+ }
if (insertCtx.isPresent()) {
BaseExternalTableInsertCommandContext context =
(BaseExternalTableInsertCommandContext) insertCtx.get();
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index ed7ccee69cd..30348c091ee 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -353,6 +353,7 @@ struct THiveTableSink {
9: optional map<string, string> hadoop_config
10: optional bool overwrite
11: optional THiveSerDeProperties serde_properties
+ 12: optional list<Types.TNetworkAddress> broker_addresses;
}
enum TUpdateMode {
@@ -413,6 +414,7 @@ struct TIcebergTableSink {
11: optional Types.TFileType file_type
12: optional string original_output_path
13: optional PlanNodes.TFileCompressType compression_type
+ 14: optional list<Types.TNetworkAddress> broker_addresses;
}
struct TDataSink {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]