This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 62940bcd51b branch-3.1: [Fix](multicatelog) Fix insert iceberg/hive 
table when use broker #51187 (#51961)
62940bcd51b is described below

commit 62940bcd51bdffa3ab34580ae212a1a5b6cade0e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 20 17:12:19 2025 +0800

    branch-3.1: [Fix](multicatelog) Fix insert iceberg/hive table when use 
broker #51187 (#51961)
    
    Cherry-picked from #51187
    
    Co-authored-by: Lijia Liu <[email protected]>
    Co-authored-by: liutang123 <[email protected]>
---
 .../writer/iceberg/viceberg_partition_writer.cpp   |  3 ++
 .../writer/iceberg/viceberg_partition_writer.h     |  1 +
 .../sink/writer/iceberg/viceberg_table_writer.cpp  | 12 ++++--
 be/src/vec/sink/writer/vhive_partition_writer.cpp  |  3 ++
 be/src/vec/sink/writer/vhive_partition_writer.h    |  1 +
 be/src/vec/sink/writer/vhive_table_writer.cpp      | 46 ++++++++++++++++------
 .../java/org/apache/doris/catalog/BrokerMgr.java   | 15 +++++++
 .../doris/planner/BaseExternalTableDataSink.java   | 21 ++++++++++
 .../org/apache/doris/planner/HiveTableSink.java    |  3 ++
 .../org/apache/doris/planner/IcebergTableSink.java |  7 +++-
 gensrc/thrift/DataSinks.thrift                     |  2 +
 11 files changed, 98 insertions(+), 16 deletions(-)

diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 8963a129eee..aeaa81d9995 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -52,6 +52,9 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profil
 
     io::FSPropertiesRef fs_properties(_write_info.file_type);
     fs_properties.properties = &_hadoop_conf;
+    if (!_write_info.broker_addresses.empty()) {
+        fs_properties.broker_addresses = &(_write_info.broker_addresses);
+    }
     io::FileDescription file_description = {
             .path = fmt::format("{}/{}", _write_info.write_path, 
_get_target_file_name()),
             .fs_name {}};
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
index b66dea1d6d2..28605f80426 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
@@ -50,6 +50,7 @@ public:
         std::string original_write_path;
         std::string target_path;
         TFileType::type file_type;
+        std::vector<TNetworkAddress> broker_addresses;
     };
 
     VIcebergPartitionWriter(const TDataSink& t_sink, std::vector<std::string> 
partition_values,
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 280cf8b8107..26e3ab2858f 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -366,9 +366,15 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
         write_path = output_path;
     }
 
-    VIcebergPartitionWriter::WriteInfo write_info = {
-            std::move(write_path), std::move(original_write_path), 
std::move(target_path),
-            iceberg_table_sink.file_type};
+    VIcebergPartitionWriter::WriteInfo write_info = {std::move(write_path),
+                                                     
std::move(original_write_path),
+                                                     std::move(target_path),
+                                                     
iceberg_table_sink.file_type,
+                                                     {}};
+    if (iceberg_table_sink.__isset.broker_addresses) {
+        
write_info.broker_addresses.assign(iceberg_table_sink.broker_addresses.begin(),
+                                           
iceberg_table_sink.broker_addresses.end());
+    }
 
     _write_file_count++;
     std::vector<std::string> column_names;
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 8a5e2a9777e..b93303dff03 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -58,6 +58,9 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
 
     io::FSPropertiesRef fs_properties(_write_info.file_type);
     fs_properties.properties = &_hadoop_conf;
+    if (!_write_info.broker_addresses.empty()) {
+        fs_properties.broker_addresses = &(_write_info.broker_addresses);
+    }
     io::FileDescription file_description = {
             .path = fmt::format("{}/{}", _write_info.write_path, 
_get_target_file_name()),
             .fs_name {}};
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h 
b/be/src/vec/sink/writer/vhive_partition_writer.h
index 71e1e141d9c..20ca6506704 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.h
+++ b/be/src/vec/sink/writer/vhive_partition_writer.h
@@ -46,6 +46,7 @@ public:
         std::string original_write_path;
         std::string target_path;
         TFileType::type file_type;
+        std::vector<TNetworkAddress> broker_addresses;
     };
 
     VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp 
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 6eb478c01b7..9c2726fa652 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -301,30 +301,42 @@ std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer
         if (existing_table == false) {   // new table
             update_mode = TUpdateMode::NEW;
             if (_partition_columns_input_index.empty()) { // new unpartitioned 
table
-                write_info = {write_location.write_path, 
write_location.original_write_path,
-                              write_location.target_path, 
write_location.file_type};
+                write_info = {write_location.write_path,
+                              write_location.original_write_path,
+                              write_location.target_path,
+                              write_location.file_type,
+                              {}};
             } else { // a new partition in a new partitioned table
                 auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
                 auto original_write_path =
                         fmt::format("{}/{}", 
write_location.original_write_path, partition_name);
                 auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
-                write_info = {std::move(write_path), 
std::move(original_write_path),
-                              std::move(target_path), 
write_location.file_type};
+                write_info = {std::move(write_path),
+                              std::move(original_write_path),
+                              std::move(target_path),
+                              write_location.file_type,
+                              {}};
             }
         } else { // a new partition in an existing partitioned table, or an 
existing unpartitioned table
             if (_partition_columns_input_index.empty()) { // an existing 
unpartitioned table
                 update_mode =
                         !hive_table_sink.overwrite ? TUpdateMode::APPEND : 
TUpdateMode::OVERWRITE;
-                write_info = {write_location.write_path, 
write_location.original_write_path,
-                              write_location.target_path, 
write_location.file_type};
+                write_info = {write_location.write_path,
+                              write_location.original_write_path,
+                              write_location.target_path,
+                              write_location.file_type,
+                              {}};
             } else { // a new partition in an existing partitioned table
                 update_mode = TUpdateMode::NEW;
                 auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
                 auto original_write_path =
                         fmt::format("{}/{}", 
write_location.original_write_path, partition_name);
                 auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
-                write_info = {std::move(write_path), 
std::move(original_write_path),
-                              std::move(target_path), 
write_location.file_type};
+                write_info = {std::move(write_path),
+                              std::move(original_write_path),
+                              std::move(target_path),
+                              write_location.file_type,
+                              {}};
             }
             // need to get schema from existing table ?
         }
@@ -337,8 +349,11 @@ std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer
             auto original_write_path =
                     fmt::format("{}/{}", write_location.original_write_path, 
partition_name);
             auto target_path = fmt::format("{}", 
existing_partition->location.target_path);
-            write_info = {std::move(write_path), 
std::move(original_write_path),
-                          std::move(target_path), 
existing_partition->location.file_type};
+            write_info = {std::move(write_path),
+                          std::move(original_write_path),
+                          std::move(target_path),
+                          existing_partition->location.file_type,
+                          {}};
             file_format_type = existing_partition->file_format;
             write_compress_type = hive_table_sink.compression_type;
         } else {
@@ -347,13 +362,20 @@ std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer
             auto original_write_path =
                     fmt::format("{}/{}", write_location.original_write_path, 
partition_name);
             auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
-            write_info = {std::move(write_path), 
std::move(original_write_path),
-                          std::move(target_path), write_location.file_type};
+            write_info = {std::move(write_path),
+                          std::move(original_write_path),
+                          std::move(target_path),
+                          write_location.file_type,
+                          {}};
             file_format_type = hive_table_sink.file_format;
             write_compress_type = hive_table_sink.compression_type;
             // need to get schema from existing table ?
         }
     }
+    if (hive_table_sink.__isset.broker_addresses) {
+        
write_info.broker_addresses.assign(hive_table_sink.broker_addresses.begin(),
+                                           
hive_table_sink.broker_addresses.end());
+    }
 
     _write_file_count++;
     std::vector<std::string> column_names;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
index 5e4e7c7cbc8..6b5a519eeda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
@@ -112,6 +112,21 @@ public class BrokerMgr {
         }
     }
 
+    public List<FsBroker> getBrokers(String brokerName) {
+        List<FsBroker> result = null;
+        lock.lock();
+        try {
+            List<FsBroker> brokerList = brokerListMap.get(brokerName);
+            if (brokerList == null || brokerList.isEmpty()) {
+                return null;
+            }
+            result = new ArrayList<>(brokerList);
+        } finally {
+            lock.unlock();
+        }
+        return result;
+    }
+
     public FsBroker getAnyBroker(String brokerName) {
         lock.lock();
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java
index 96b1c7e7d39..433162f93fe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java
@@ -20,14 +20,20 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TNetworkAddress;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public abstract class BaseExternalTableDataSink extends DataSink {
 
@@ -53,6 +59,21 @@ public abstract class BaseExternalTableDataSink extends 
DataSink {
      */
     protected abstract Set<TFileFormatType> supportedFileFormatTypes();
 
+    protected List<TNetworkAddress> getBrokerAddresses(String bindBroker) 
throws AnalysisException {
+        List<FsBroker> brokers;
+        if (bindBroker != null) {
+            brokers = 
Env.getCurrentEnv().getBrokerMgr().getBrokers(bindBroker);
+        } else {
+            brokers = Env.getCurrentEnv().getBrokerMgr().getAllBrokers();
+        }
+        if (brokers == null || brokers.isEmpty()) {
+            throw new AnalysisException("No alive broker.");
+        }
+        Collections.shuffle(brokers);
+        return brokers.stream().map(broker -> new TNetworkAddress(broker.host, 
broker.port))
+                .collect(Collectors.toList());
+    }
+
     protected TFileFormatType getTFileFormatType(String format) throws 
AnalysisException {
         TFileFormatType fileFormatType = TFileFormatType.FORMAT_UNKNOWN;
         String lowerCase = format.toLowerCase();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index a4012d357e5..bb4786f226a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -150,6 +150,9 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
         }
         locationParams.setFileType(fileType);
         tSink.setLocation(locationParams);
+        if (fileType.equals(TFileType.FILE_BROKER)) {
+            
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
+        }
 
         tSink.setHadoopConfig(targetTable.getHadoopProperties());
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index bfacb572305..5bc0c803cb9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -27,6 +27,7 @@ import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TIcebergTableSink;
 import org.apache.doris.thrift.TSortField;
 
@@ -134,7 +135,11 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         LocationPath locationPath = new 
LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps);
         tSink.setOutputPath(locationPath.toStorageLocation().toString());
         tSink.setOriginalOutputPath(locationPath.getPath().toString());
-        tSink.setFileType(locationPath.getTFileTypeForBE());
+        TFileType fileType = locationPath.getTFileTypeForBE();
+        tSink.setFileType(fileType);
+        if (fileType.equals(TFileType.FILE_BROKER)) {
+            
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
+        }
 
         if (insertCtx.isPresent()) {
             BaseExternalTableInsertCommandContext context = 
(BaseExternalTableInsertCommandContext) insertCtx.get();
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index ed7ccee69cd..30348c091ee 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -353,6 +353,7 @@ struct THiveTableSink {
     9: optional map<string, string> hadoop_config
     10: optional bool overwrite
     11: optional THiveSerDeProperties serde_properties
+    12: optional list<Types.TNetworkAddress> broker_addresses;
 }
 
 enum TUpdateMode {
@@ -413,6 +414,7 @@ struct TIcebergTableSink {
     11: optional Types.TFileType file_type
     12: optional string original_output_path
     13: optional PlanNodes.TFileCompressType compression_type
+    14: optional list<Types.TNetworkAddress> broker_addresses;
 }
 
 struct TDataSink {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to