morningman commented on a change in pull request #6539: URL: https://github.com/apache/incubator-doris/pull/6539#discussion_r699833595
########## File path: fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java ########## @@ -45,6 +48,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.doris.backup.S3Storage.S3_PROPERTIES_PREFIX; Review comment: remove static import ########## File path: be/src/exec/data_sink.cpp ########## @@ -78,14 +80,29 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink } sink->reset(tmp_sink); break; - case TDataSinkType::MEMORY_SCRATCH_SINK: + } + case TDataSinkType::RESULT_FILE_SINK: { + if (!thrift_sink.__isset.result_file_sink) { + return Status::InternalError("Missing result file sink."); + } + if (params.__isset.destinations && params.destinations.size() > 0) { Review comment: Add comments ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java ########## @@ -298,9 +285,76 @@ private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root) return selectNode; } + private void pushDownResultFileSink(Analyzer analyzer) { Review comment: Add some comments? ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java ########## @@ -78,6 +78,13 @@ public ExchangeNode(PlanNodeId id, PlanNode inputNode, boolean copyConjuncts) { computeTupleIds(); } + public boolean isMergingExchange() { + if (planNodeName.equals(MERGING_EXCHANGE_NODE)) { Review comment: Use `mergeInfo` to check it better? ########## File path: be/src/runtime/file_result_writer.h ########## @@ -31,6 +32,7 @@ class RuntimeProfile; class TupleRow; struct ResultFileOptions { + // deprecated bool is_local_file; Review comment: Why not delete it? ########## File path: fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java ########## @@ -771,12 +774,17 @@ public boolean isExtractWideRangeExpr() { return extractWideRangeExpr; } +<<<<<<< HEAD Review comment: ?? ########## File path: be/src/runtime/file_result_writer.cpp ########## @@ -392,7 +454,8 @@ Status FileResultWriter::close() { // so does the profile in RuntimeState. COUNTER_SET(_written_rows_counter, _written_rows); SCOPED_TIMER(_writer_close_timer); - return _close_file_writer(true); + RETURN_IF_ERROR(_close_file_writer(true, false)); Review comment: just return ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java ########## @@ -298,9 +285,76 @@ private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root) return selectNode; } + private void pushDownResultFileSink(Analyzer analyzer) { + if (fragments.size() < 1) { + return; + } + if (!(fragments.get(0).getSink() instanceof ResultFileSink)) { + return; + } + if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) { + return; + } + if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) { + return; + } + PlanFragment topPlanFragment = fragments.get(0); + ExchangeNode topPlanNode = (ExchangeNode) topPlanFragment.getPlanRoot(); + // try to push down result file sink + if (topPlanNode.isMergingExchange()) { + return; + } + PlanFragment secondPlanFragment = fragments.get(1); + ResultFileSink resultFileSink = (ResultFileSink) topPlanFragment.getSink(); + if (resultFileSink.getStorageType() == StorageBackend.StorageType.BROKER) { + return; + } + if (secondPlanFragment.getOutputExprs() != null) { + return; + } + // create result file sink desc + TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer); + resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink()); + resultFileSink.setOutputTupleId(fileStatusDesc.getId()); + secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs()); + secondPlanFragment.resetSink(resultFileSink); + ResultSink resultSink = new ResultSink(topPlanNode.getId()); + topPlanFragment.resetSink(resultSink); + topPlanFragment.resetOutputExprs(fileStatusDesc); + topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId())); + } + + private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) { Review comment: Add some comments ########## File path: be/src/runtime/file_result_writer.cpp ########## @@ -39,13 +42,36 @@ namespace doris { const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; +// deprecated FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts, const std::vector<ExprContext*>& output_expr_ctxs, RuntimeProfile* parent_profile, BufferControlBlock* sinker) : _file_opts(file_opts), _output_expr_ctxs(output_expr_ctxs), _parent_profile(parent_profile), - _sinker(sinker) {} + _sinker(sinker) { + if (_file_opts->is_local_file) { + _storage_type = TStorageBackendType::LOCAL; + } else { + _storage_type = TStorageBackendType::BROKER; + } + _fragment_instance_id.hi = 12345678987654321; Review comment: What's this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org