This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 4c6f13cb646 [Feature](WIP) add S3 Stream job split offset (#55927)
4c6f13cb646 is described below

commit 4c6f13cb646bceb15b4093e62ffeb47ff6793f85
Author: wudi <w...@selectdb.com>
AuthorDate: Fri Sep 12 10:38:03 2025 +0800

    [Feature](WIP) add S3 Stream job split offset (#55927)
    
    ### What problem does this PR solve?
    
    1. add S3 Stream job split offset
    2. fix stream job create bug
---
 .../main/java/org/apache/doris/fs/FileSystem.java  |  14 +++
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java | 139 +++++++++++++++++++++
 .../org/apache/doris/fs/remote/S3FileSystem.java   |   7 ++
 .../doris/job/base/JobExecutionConfiguration.java  |   2 +-
 .../doris/job/extensions/insert/InsertJob.java     |   3 +-
 .../insert/streaming/StreamingInsertJob.java       |  16 ++-
 .../streaming/StreamingJobSchedulerTask.java       |   5 +-
 .../doris/job/offset/SourceOffsetProvider.java     |  12 +-
 .../job/offset/SourceOffsetProviderFactory.java    |   2 +-
 .../org/apache/doris/job/offset/s3/S3Offset.java   |   4 +-
 .../job/offset/s3/S3SourceOffsetProvider.java      |  60 +++++++--
 .../trees/plans/commands/info/CreateJobInfo.java   |  25 ++--
 .../commands/insert/InsertIntoTableCommand.java    |  33 +++--
 .../org/apache/doris/persist/gson/GsonUtils.java   |   4 +-
 14 files changed, 283 insertions(+), 43 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index 5ea7a3b67c1..cfbb3e560f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -115,6 +115,20 @@ public interface FileSystem {
      */
     Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly);
 
+    /**
+     * List files in remotePath <br/>
+     * @param remotePath remote path
+     * @param result All eligible files under the path
+     * @param startFile start file name
+     * @param fileSizeLimit limit the total size of files to be listed.
+     * @param fileNumLimit limit the total number of files to be listed.
+     * @return
+     */
+    default String globListWithLimit(String remotePath, List<String> result,
+            String startFile, long fileSizeLimit, long fileNumLimit) {
+        throw new UnsupportedOperationException("Unsupported operation glob 
list with limit on current file system.");
+    }
+
     default Status listDirectories(String remotePath, Set<String> result) {
         throw new UnsupportedOperationException("Unsupported operation list 
directories on current file system.");
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 238072df3c0..0a4c9159881 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -54,6 +54,7 @@ import 
software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request.Builder;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
@@ -630,6 +631,144 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
         }
     }
 
+
+    /**
+     * List all files under the given path with glob pattern.
+     * For example, if the path is "s3://bucket/path/to/*.csv",
+     * it will list all files under "s3://bucket/path/to/" with ".csv" suffix.
+     * <p>
+     * Limit: Starting from startFile, until the total file size is greater 
than fileSizeLimit,
+     * or the number of files is greater than fileNumLimit.
+     *
+     * @return The largest file name after listObject this time
+     */
+    public String globListWithLimit(String remotePath, List<String> result, 
String startFile,
+            long fileSizeLimit, long fileNumLimit) {
+        long roundCnt = 0;
+        long elementCnt = 0;
+        long matchCnt = 0;
+        long matchFileSize = 0L;
+        long startTime = System.nanoTime();
+        try {
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            if (uri.useS3DirectoryBucket()) {
+                throw new RuntimeException("Not support glob with limit for 
directory bucket");
+            }
+
+            String bucket = uri.getBucket();
+            String globPath = uri.getKey(); // eg: path/to/*.csv
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("globList globPath:{}, remotePath:{}", globPath, 
remotePath);
+            }
+            java.nio.file.Path pathPattern = Paths.get(globPath);
+            PathMatcher matcher = 
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
+            HashSet<String> directorySet = new HashSet<>();
+
+            String listPrefix = S3Util.getLongestPrefix(globPath); // similar 
to Azure
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("globList listPrefix: '{}' (from globPath: '{}')", 
listPrefix, globPath);
+            }
+
+            Builder builder = ListObjectsV2Request.builder();
+            builder.bucket(bucket)
+                    .prefix(listPrefix);
+
+            if (startFile != null) {
+                builder.startAfter(startFile);
+            }
+
+            ListObjectsV2Request request = builder.build();
+
+            String currentMaxFile = "";
+            boolean isTruncated = false;
+            do {
+                roundCnt++;
+                ListObjectsV2Response response = listObjectsV2(request);
+                for (S3Object obj : response.contents()) {
+                    elementCnt++;
+                    java.nio.file.Path objPath = Paths.get(obj.key());
+
+                    boolean isPrefix = false;
+                    while (objPath != null && 
objPath.normalize().toString().startsWith(listPrefix)) {
+                        if (!matcher.matches(objPath)) {
+                            isPrefix = true;
+                            objPath = objPath.getParent();
+                            continue;
+                        }
+                        if 
(directorySet.contains(objPath.normalize().toString())) {
+                            break;
+                        }
+                        if (isPrefix) {
+                            directorySet.add(objPath.normalize().toString());
+                        }
+
+                        matchCnt++;
+                        matchFileSize += obj.size();
+                        String remoteFileName = "s3://" + bucket + "/" + 
objPath;
+                        result.add(remoteFileName);
+
+                        if (reachLimit(result.size(), matchFileSize, 
fileSizeLimit, fileNumLimit)) {
+                            break;
+                        }
+
+                        objPath = objPath.getParent();
+                        isPrefix = true;
+                    }
+                }
+                //record current last object file name
+                S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
+                java.nio.file.Path lastObjPath = Paths.get(lastS3Object.key());
+                currentMaxFile =  "s3://" + bucket + "/" + lastObjPath;
+
+                isTruncated = response.isTruncated();
+                if (isTruncated) {
+                    request = request.toBuilder()
+                            
.continuationToken(response.nextContinuationToken())
+                            .build();
+                }
+            } while (isTruncated);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("remotePath:{}, result:{}", remotePath, result);
+            }
+            return currentMaxFile;
+        } catch (Exception e) {
+            LOG.warn("Errors while getting file status", e);
+            throw new RuntimeException(e);
+        } finally {
+            long endTime = System.nanoTime();
+            long duration = endTime - startTime;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("process {} elements under prefix {} for {} round, 
match {} elements, take {} ms",
+                        elementCnt, remotePath, roundCnt, matchCnt,
+                        duration / 1000 / 1000);
+            }
+        }
+    }
+
+    private static boolean reachLimit(int matchFileCnt, long matchFileSize, 
long sizeLimit, long fileNum) {
+        if (matchFileCnt < 0 || sizeLimit < 0 || fileNum < 0) {
+            return false;
+        }
+        if (fileNum > 0 && matchFileCnt >= fileNum) {
+            LOG.info(
+                    "reach file num limit fileNum:{} objectFiles count:{}",
+                    fileNum,
+                    matchFileCnt);
+            return true;
+        }
+
+        if (sizeLimit > 0 && matchFileSize >= sizeLimit) {
+            LOG.info(
+                    "reach size limit sizeLimit:{}, objectFilesSize:{}",
+                    sizeLimit,
+                    matchFileSize);
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public synchronized void close() throws Exception {
         if (client != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index c3608b7b35f..9c409a66a29 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -68,6 +68,13 @@ public class S3FileSystem extends ObjFileSystem {
         return objStorage.globList(remotePath, result, fileNameOnly);
     }
 
+    @Override
+    public String globListWithLimit(String remotePath, List<String> result, 
String startFile,
+            long fileSizeLimit, long fileNumLimit) {
+        S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
+        return objStorage.globListWithLimit(remotePath, result, startFile, 
fileSizeLimit, fileNumLimit);
+    }
+
     @Override
     public Status listDirectories(String remotePath, Set<String> result) {
         S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index f26828c5a45..3d8c9afa36b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -100,7 +100,7 @@ public class JobExecutionConfiguration {
         if (timerDefinition.getStartTimeMs() == null) {
             throw new IllegalArgumentException("startTimeMs cannot be null");
         }
-        if (isImmediate()) {
+        if (isImmediate() || JobExecuteType.STREAMING.equals(executeType)) {
             return;
         }
         if (timerDefinition.getStartTimeMs() < System.currentTimeMillis()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index e855aa4f836..67b56dbc13d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -549,7 +549,8 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         trow.addToColumnValue(new TCell().setStringVal(getComment()));
         trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        trow.addToColumnValue(new 
TCell().setStringVal(loadStatistic.toJson()));
+        trow.addToColumnValue(new TCell().setStringVal(
+                loadStatistic == null ? FeConstants.null_string : 
loadStatistic.toJson()));
         trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
         return trow;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 1b4cac14bc8..84833604f84 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -32,12 +32,14 @@ import org.apache.doris.job.common.PauseReason;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
 import org.apache.doris.job.offset.SourceOffsetProviderFactory;
 import org.apache.doris.job.task.AbstractTask;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -64,8 +66,6 @@ import java.util.Map;
 
 public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> implements
         TxnStateChangeCallback {
-
-    @SerializedName("did")
     private final long dbId;
     private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
     @SerializedName("fm")
@@ -79,7 +79,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @Setter
     protected long autoResumeCount;
     @Getter
-    @SerializedName("jp")
     private StreamingJobProperties jobProperties;
     @Getter
     StreamingInsertTask runningStreamTask;
@@ -101,12 +100,15 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.jobProperties = jobProperties;
         String tvfType = parseTvfType();
         this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
+        this.offsetProvider.init(getExecuteSql(), jobProperties);
     }
 
     private String parseTvfType() {
         NereidsParser parser = new NereidsParser();
         InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(getExecuteSql());
-        return command.getFirstTvfName();
+        UnboundTVFRelation firstTVF = command.getFirstTVF();
+        Preconditions.checkNotNull(firstTVF, "Only support insert sql with 
tvf");
+        return firstTVF.getFunctionName();
     }
 
     @Override
@@ -138,7 +140,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     protected StreamingInsertTask createStreamingInsertTask() {
-        InsertIntoTableCommand command = 
offsetProvider.rewriteTvfParams(getExecuteSql());
+        Offset nextOffset = offsetProvider.getNextOffset();
+        InsertIntoTableCommand command = 
offsetProvider.rewriteTvfParams(nextOffset);
         this.runningStreamTask = new StreamingInsertTask(getJobId(), 
AbstractTask.getNextTaskId(), command,
                 getCurrentDbName(), offsetProvider.getCurrentOffset(), 
jobProperties);
         return this.runningStreamTask;
@@ -221,7 +224,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new TCell().setStringVal(getComment()));
         trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        trow.addToColumnValue(new TCell().setStringVal(jobStatistic.toJson()));
+        trow.addToColumnValue(new TCell().setStringVal(
+                jobStatistic == null ? FeConstants.null_string : 
jobStatistic.toJson()));
         trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
         return trow;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index d724b09fbf0..34ccefeb360 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -95,6 +95,9 @@ public class StreamingJobSchedulerTask extends AbstractTask {
     @Override
     public TRow getTvfInfo(String jobName) {
         StreamingInsertTask runningTask = 
streamingInsertJob.getRunningStreamTask();
+        if (runningTask == null) {
+            return null;
+        }
         TRow trow = new TRow();
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
@@ -117,6 +120,6 @@ public class StreamingJobSchedulerTask extends AbstractTask 
{
             trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
         }
         trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getOffset().toJson()));
-        return null;
+        return trow;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 564327ed0cc..49a272c664f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -17,12 +17,19 @@
 
 package org.apache.doris.job.offset;
 
+import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 
 /**
  * Interface for managing offsets and metadata of a data source.
  */
 public interface SourceOffsetProvider {
+
+    /**
+     * init
+     */
+    void init(String executeSql, StreamingJobProperties jobProperties);
+
     /**
      * Get source type, e.g. s3, kafka
      * @return
@@ -43,10 +50,10 @@ public interface SourceOffsetProvider {
 
     /**
      * Rewrite the TVF parameters in the SQL based on the current offset.
-     * @param sql
+     * @param nextOffset
      * @return rewritten InsertIntoTableCommand
      */
-    InsertIntoTableCommand rewriteTvfParams(String sql);
+    InsertIntoTableCommand rewriteTvfParams(Offset nextOffset);
 
     /**
      * Update the offset of the source.
@@ -64,5 +71,6 @@ public interface SourceOffsetProvider {
      * @return
      */
     boolean hasMoreDataToConsume();
+
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
index 9cefa4e9d42..adc49249eb1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
@@ -35,7 +35,7 @@ public class SourceOffsetProviderFactory {
 
     public static SourceOffsetProvider createSourceOffsetProvider(String 
sourceType) {
         try {
-            Class<? extends SourceOffsetProvider> cla = 
map.get(sourceType.toUpperCase());
+            Class<? extends SourceOffsetProvider> cla = 
map.get(sourceType.toLowerCase());
             if (cla == null) {
                 throw new JobException("Unsupported source type: " + 
sourceType);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 57f89b99505..95271ba60ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -21,13 +21,15 @@ import org.apache.doris.job.offset.Offset;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import lombok.Getter;
+import lombok.Setter;
 
 import java.util.List;
 
+@Getter
+@Setter
 public class S3Offset implements Offset {
     String startFile;
     String endFile;
-    @Getter
     List<String> fileLists;
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index e52d9995051..1c7b3839e9d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -17,17 +17,50 @@
 
 package org.apache.doris.job.offset.s3;
 
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 
+import lombok.extern.log4j.Log4j2;
+
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+@Log4j2
 public class S3SourceOffsetProvider implements SourceOffsetProvider {
+    String executeSql;
     S3Offset currentOffset;
     String maxRemoteEndFile;
+    StreamingJobProperties jobProperties;
+    NereidsParser parser;
+    String filePath;
+    StorageProperties storageProperties;
+
+    @Override
+    public void init(String executeSql, StreamingJobProperties jobProperties) {
+        this.executeSql = executeSql;
+        this.jobProperties = jobProperties;
+        this.parser = new NereidsParser();
+        InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(executeSql);
+        UnboundTVFRelation firstTVF = command.getFirstTVF();
+        Map<String, String> properties = firstTVF.getProperties().getMap();
+        try {
+            this.storageProperties = 
StorageProperties.createPrimary(properties);
+            String uri = storageProperties.validateAndGetUri(properties);
+            this.filePath = storageProperties.validateAndNormalizeUri(uri);
+        } catch (UserException e) {
+            throw new RuntimeException("Failed check storage props, " + 
e.getMessage(), e);
+        }
+    }
 
     @Override
     public String getSourceType() {
@@ -36,8 +69,19 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
 
     @Override
     public S3Offset getNextOffset() {
-        //todo: listObjects from end file
-        return null;
+        S3Offset offset = new S3Offset();
+        List<String> rfiles = new ArrayList<>();
+        try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
+            maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, 
currentOffset.endFile,
+                    jobProperties.getS3BatchFiles(), 
jobProperties.getS3BatchSize());
+            offset.setStartFile(currentOffset.endFile);
+            offset.setEndFile(rfiles.get(rfiles.size() - 1));
+            offset.setFileLists(rfiles);
+        } catch (Exception e) {
+            log.warn("list path exception, path={}", filePath, e);
+            throw new RuntimeException(e);
+        }
+        return offset;
     }
 
     @Override
@@ -46,15 +90,15 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public InsertIntoTableCommand rewriteTvfParams(String sql) {
-        S3Offset nextOffset = getNextOffset();
+    public InsertIntoTableCommand rewriteTvfParams(Offset nextOffset) {
+        S3Offset offset = (S3Offset) nextOffset;
         Map<String, String> props = new HashMap<>();
         //todo: need to change file list to glob string
-        props.put("uri", nextOffset.getFileLists().toString());
+        props.put("uri", offset.getFileLists().toString());
+
+        InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(executeSql);
 
-        NereidsParser parser = new NereidsParser();
-        InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(sql);
-        command.rewriteTvfProperties(getSourceType(), props);
+        command.rewriteFirstTvfProperties(getSourceType(), props);
         return command;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 4334526630e..61a8eb0a5c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -129,6 +129,7 @@ public class CreateJobInfo {
         if (streamingJob) {
             executeType = JobExecuteType.STREAMING;
             properties = new StreamingJobProperties(jobProperties);
+            properties.validate();
         }
         jobExecutionConfiguration.setExecuteType(executeType);
 
@@ -273,15 +274,21 @@ public class CreateJobInfo {
         NereidsParser parser = new NereidsParser();
         LogicalPlan logicalPlan = parser.parseSingle(sql);
         if (logicalPlan instanceof InsertIntoTableCommand) {
-            return new StreamingInsertJob(labelNameOptional.get(),
-                    JobStatus.PENDING,
-                    currentDbName,
-                    comment,
-                    ConnectContext.get().getCurrentUserIdentity(),
-                    jobExecutionConfiguration,
-                    System.currentTimeMillis(),
-                    sql,
-                    (StreamingJobProperties) properties);
+            // InsertIntoTableCommand insertIntoTableCommand = 
(InsertIntoTableCommand) logicalPlan;
+            try {
+                // insertIntoTableCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor(), false);
+                return new StreamingInsertJob(labelNameOptional.get(),
+                        JobStatus.PENDING,
+                        currentDbName,
+                        comment,
+                        ConnectContext.get().getCurrentUserIdentity(),
+                        jobExecutionConfiguration,
+                        System.currentTimeMillis(),
+                        sql,
+                        (StreamingJobProperties) properties);
+            } catch (Exception e) {
+                throw new AnalysisException(e.getMessage());
+            }
         } else {
             throw new AnalysisException("Not support this sql : " + sql + " 
Command class is "
                     + logicalPlan.getClass().getName() + ".");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index a5a428c7815..8bf31d941df 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -83,6 +83,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -539,46 +540,54 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
     }
 
     // todo: add ut
-    public String getFirstTvfName() {
+    public UnboundTVFRelation getFirstTVF() {
         return getFirstTvfInPlan(getLogicalQuery());
     }
 
-    private String getFirstTvfInPlan(LogicalPlan plan) {
+    private UnboundTVFRelation getFirstTvfInPlan(LogicalPlan plan) {
         if (plan instanceof UnboundTVFRelation) {
             UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
-            return tvfRelation.getFunctionName();
+            return tvfRelation;
         }
 
         for (Plan child : plan.children()) {
             if (child instanceof LogicalPlan) {
-                String result = getFirstTvfInPlan((LogicalPlan) child);
-                if (!result.isEmpty()) {
+                UnboundTVFRelation result = getFirstTvfInPlan((LogicalPlan) 
child);
+                if (result != null) {
                     return result;
                 }
             }
         }
-        return "";
+        return null;
     }
 
     // todo: add ut
-    public void rewriteTvfProperties(String functionName, Map<String, String> 
props) {
-        rewriteTvfInPlan(originLogicalQuery, functionName, props);
-        if (logicalQuery.isPresent()) {
-            rewriteTvfInPlan(logicalQuery.get(), functionName, props);
+    public void rewriteFirstTvfProperties(String functionName, Map<String, 
String> props) {
+        AtomicBoolean found = new AtomicBoolean(false);
+        rewriteFirstTvfInPlan(originLogicalQuery, functionName, props, found);
+        if (logicalQuery.isPresent() && !found.get()) {
+            rewriteFirstTvfInPlan(logicalQuery.get(), functionName, props, 
found);
         }
     }
 
-    private void rewriteTvfInPlan(LogicalPlan plan, String functionName, 
Map<String, String> props) {
+    private void rewriteFirstTvfInPlan(LogicalPlan plan,
+            String functionName, Map<String, String> props, AtomicBoolean 
found) {
+        if (found.get()) {
+            return;
+        }
+
         if (plan instanceof UnboundTVFRelation) {
             UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
             if (functionName.equalsIgnoreCase(tvfRelation.getFunctionName())) {
                 tvfRelation.getProperties().getMap().putAll(props);
+                found.set(true);
+                return;
             }
         }
 
         for (Plan child : plan.children()) {
             if (child instanceof LogicalPlan) {
-                rewriteTvfInPlan((LogicalPlan) child, functionName, props);
+                rewriteFirstTvfInPlan((LogicalPlan) child, functionName, 
props, found);
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 07032432ace..af79329f681 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -182,6 +182,7 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.fs.remote.dfs.JFSFileSystem;
 import org.apache.doris.fs.remote.dfs.OFSFileSystem;
 import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.load.loadv2.BrokerLoadJob;
 import org.apache.doris.load.loadv2.BulkLoadJob;
@@ -447,7 +448,8 @@ public class GsonUtils {
             jobExecutorRuntimeTypeAdapterFactory
                     = 
RuntimeTypeAdapterFactory.of(org.apache.doris.job.base.AbstractJob.class, 
"clazz")
                             .registerSubtype(InsertJob.class, 
InsertJob.class.getSimpleName())
-                            .registerSubtype(MTMVJob.class, 
MTMVJob.class.getSimpleName());
+                            .registerSubtype(MTMVJob.class, 
MTMVJob.class.getSimpleName())
+                            .registerSubtype(StreamingInsertJob.class, 
StreamingInsertJob.class.getSimpleName());
 
     private static RuntimeTypeAdapterFactory<MTMVSnapshotIf> 
mtmvSnapshotTypeAdapterFactory =
             RuntimeTypeAdapterFactory.of(MTMVSnapshotIf.class, "clazz")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to