This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch streaming-job-dev in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push: new 4c6f13cb646 [Feature](WIP) add S3 Stream job split offset (#55927) 4c6f13cb646 is described below commit 4c6f13cb646bceb15b4093e62ffeb47ff6793f85 Author: wudi <w...@selectdb.com> AuthorDate: Fri Sep 12 10:38:03 2025 +0800 [Feature](WIP) add S3 Stream job split offset (#55927) ### What problem does this PR solve? 1. add S3 Stream job split offset 2. fix stream job create bug --- .../main/java/org/apache/doris/fs/FileSystem.java | 14 +++ .../java/org/apache/doris/fs/obj/S3ObjStorage.java | 139 +++++++++++++++++++++ .../org/apache/doris/fs/remote/S3FileSystem.java | 7 ++ .../doris/job/base/JobExecutionConfiguration.java | 2 +- .../doris/job/extensions/insert/InsertJob.java | 3 +- .../insert/streaming/StreamingInsertJob.java | 16 ++- .../streaming/StreamingJobSchedulerTask.java | 5 +- .../doris/job/offset/SourceOffsetProvider.java | 12 +- .../job/offset/SourceOffsetProviderFactory.java | 2 +- .../org/apache/doris/job/offset/s3/S3Offset.java | 4 +- .../job/offset/s3/S3SourceOffsetProvider.java | 60 +++++++-- .../trees/plans/commands/info/CreateJobInfo.java | 25 ++-- .../commands/insert/InsertIntoTableCommand.java | 33 +++-- .../org/apache/doris/persist/gson/GsonUtils.java | 4 +- 14 files changed, 283 insertions(+), 43 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index 5ea7a3b67c1..cfbb3e560f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -115,6 +115,20 @@ public interface FileSystem { */ Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly); + /** + * List files in remotePath <br/> + * @param remotePath remote path + * @param result All eligible files under the path + * @param startFile start file name + * @param fileSizeLimit limit the total size of files to be listed. + * @param fileNumLimit limit the total number of files to be listed. + * @return + */ + default String globListWithLimit(String remotePath, List<String> result, + String startFile, long fileSizeLimit, long fileNumLimit) { + throw new UnsupportedOperationException("Unsupported operation glob list with limit on current file system."); + } + default Status listDirectories(String remotePath, Set<String> result) { throw new UnsupportedOperationException("Unsupported operation list directories on current file system."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java index 238072df3c0..0a4c9159881 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java @@ -54,6 +54,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request.Builder; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; @@ -630,6 +631,144 @@ public class S3ObjStorage implements ObjStorage<S3Client> { } } + + /** + * List all files under the given path with glob pattern. + * For example, if the path is "s3://bucket/path/to/*.csv", + * it will list all files under "s3://bucket/path/to/" with ".csv" suffix. + * <p> + * Limit: Starting from startFile, until the total file size is greater than fileSizeLimit, + * or the number of files is greater than fileNumLimit. + * + * @return The largest file name after listObject this time + */ + public String globListWithLimit(String remotePath, List<String> result, String startFile, + long fileSizeLimit, long fileNumLimit) { + long roundCnt = 0; + long elementCnt = 0; + long matchCnt = 0; + long matchFileSize = 0L; + long startTime = System.nanoTime(); + try { + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); + if (uri.useS3DirectoryBucket()) { + throw new RuntimeException("Not support glob with limit for directory bucket"); + } + + String bucket = uri.getBucket(); + String globPath = uri.getKey(); // eg: path/to/*.csv + + if (LOG.isDebugEnabled()) { + LOG.debug("globList globPath:{}, remotePath:{}", globPath, remotePath); + } + java.nio.file.Path pathPattern = Paths.get(globPath); + PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern); + HashSet<String> directorySet = new HashSet<>(); + + String listPrefix = S3Util.getLongestPrefix(globPath); // similar to Azure + if (LOG.isDebugEnabled()) { + LOG.debug("globList listPrefix: '{}' (from globPath: '{}')", listPrefix, globPath); + } + + Builder builder = ListObjectsV2Request.builder(); + builder.bucket(bucket) + .prefix(listPrefix); + + if (startFile != null) { + builder.startAfter(startFile); + } + + ListObjectsV2Request request = builder.build(); + + String currentMaxFile = ""; + boolean isTruncated = false; + do { + roundCnt++; + ListObjectsV2Response response = listObjectsV2(request); + for (S3Object obj : response.contents()) { + elementCnt++; + java.nio.file.Path objPath = Paths.get(obj.key()); + + boolean isPrefix = false; + while (objPath != null && objPath.normalize().toString().startsWith(listPrefix)) { + if (!matcher.matches(objPath)) { + isPrefix = true; + objPath = objPath.getParent(); + continue; + } + if (directorySet.contains(objPath.normalize().toString())) { + break; + } + if (isPrefix) { + directorySet.add(objPath.normalize().toString()); + } + + matchCnt++; + matchFileSize += obj.size(); + String remoteFileName = "s3://" + bucket + "/" + objPath; + result.add(remoteFileName); + + if (reachLimit(result.size(), matchFileSize, fileSizeLimit, fileNumLimit)) { + break; + } + + objPath = objPath.getParent(); + isPrefix = true; + } + } + //record current last object file name + S3Object lastS3Object = response.contents().get(response.contents().size() - 1); + java.nio.file.Path lastObjPath = Paths.get(lastS3Object.key()); + currentMaxFile = "s3://" + bucket + "/" + lastObjPath; + + isTruncated = response.isTruncated(); + if (isTruncated) { + request = request.toBuilder() + .continuationToken(response.nextContinuationToken()) + .build(); + } + } while (isTruncated); + + if (LOG.isDebugEnabled()) { + LOG.debug("remotePath:{}, result:{}", remotePath, result); + } + return currentMaxFile; + } catch (Exception e) { + LOG.warn("Errors while getting file status", e); + throw new RuntimeException(e); + } finally { + long endTime = System.nanoTime(); + long duration = endTime - startTime; + if (LOG.isDebugEnabled()) { + LOG.debug("process {} elements under prefix {} for {} round, match {} elements, take {} ms", + elementCnt, remotePath, roundCnt, matchCnt, + duration / 1000 / 1000); + } + } + } + + private static boolean reachLimit(int matchFileCnt, long matchFileSize, long sizeLimit, long fileNum) { + if (matchFileCnt < 0 || sizeLimit < 0 || fileNum < 0) { + return false; + } + if (fileNum > 0 && matchFileCnt >= fileNum) { + LOG.info( + "reach file num limit fileNum:{} objectFiles count:{}", + fileNum, + matchFileCnt); + return true; + } + + if (sizeLimit > 0 && matchFileSize >= sizeLimit) { + LOG.info( + "reach size limit sizeLimit:{}, objectFilesSize:{}", + sizeLimit, + matchFileSize); + return true; + } + return false; + } + @Override public synchronized void close() throws Exception { if (client != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index c3608b7b35f..9c409a66a29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -68,6 +68,13 @@ public class S3FileSystem extends ObjFileSystem { return objStorage.globList(remotePath, result, fileNameOnly); } + @Override + public String globListWithLimit(String remotePath, List<String> result, String startFile, + long fileSizeLimit, long fileNumLimit) { + S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; + return objStorage.globListWithLimit(remotePath, result, startFile, fileSizeLimit, fileNumLimit); + } + @Override public Status listDirectories(String remotePath, Set<String> result) { S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index f26828c5a45..3d8c9afa36b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -100,7 +100,7 @@ public class JobExecutionConfiguration { if (timerDefinition.getStartTimeMs() == null) { throw new IllegalArgumentException("startTimeMs cannot be null"); } - if (isImmediate()) { + if (isImmediate() || JobExecuteType.STREAMING.equals(executeType)) { return; } if (timerDefinition.getStartTimeMs() < System.currentTimeMillis()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index e855aa4f836..67b56dbc13d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -549,7 +549,8 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl trow.addToColumnValue(new TCell().setStringVal(getComment())); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + trow.addToColumnValue(new TCell().setStringVal( + loadStatistic == null ? FeConstants.null_string : loadStatistic.toJson())); trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg())); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 1b4cac14bc8..84833604f84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -32,12 +32,14 @@ import org.apache.doris.job.common.PauseReason; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.job.offset.Offset; import org.apache.doris.job.offset.SourceOffsetProvider; import org.apache.doris.job.offset.SourceOffsetProviderFactory; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.load.FailMsg; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; +import org.apache.doris.nereids.analyzer.UnboundTVFRelation; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.persist.gson.GsonUtils; @@ -64,8 +66,6 @@ import java.util.Map; public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, Map<Object, Object>> implements TxnStateChangeCallback { - - @SerializedName("did") private final long dbId; private StreamingJobStatistic jobStatistic = new StreamingJobStatistic(); @SerializedName("fm") @@ -79,7 +79,6 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M @Setter protected long autoResumeCount; @Getter - @SerializedName("jp") private StreamingJobProperties jobProperties; @Getter StreamingInsertTask runningStreamTask; @@ -101,12 +100,15 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M this.jobProperties = jobProperties; String tvfType = parseTvfType(); this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType); + this.offsetProvider.init(getExecuteSql(), jobProperties); } private String parseTvfType() { NereidsParser parser = new NereidsParser(); InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(getExecuteSql()); - return command.getFirstTvfName(); + UnboundTVFRelation firstTVF = command.getFirstTVF(); + Preconditions.checkNotNull(firstTVF, "Only support insert sql with tvf"); + return firstTVF.getFunctionName(); } @Override @@ -138,7 +140,8 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M } protected StreamingInsertTask createStreamingInsertTask() { - InsertIntoTableCommand command = offsetProvider.rewriteTvfParams(getExecuteSql()); + Offset nextOffset = offsetProvider.getNextOffset(); + InsertIntoTableCommand command = offsetProvider.rewriteTvfParams(nextOffset); this.runningStreamTask = new StreamingInsertTask(getJobId(), AbstractTask.getNextTaskId(), command, getCurrentDbName(), offsetProvider.getCurrentOffset(), jobProperties); return this.runningStreamTask; @@ -221,7 +224,8 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M trow.addToColumnValue(new TCell().setStringVal(getComment())); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(jobStatistic.toJson())); + trow.addToColumnValue(new TCell().setStringVal( + jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson())); trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg())); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index d724b09fbf0..34ccefeb360 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -95,6 +95,9 @@ public class StreamingJobSchedulerTask extends AbstractTask { @Override public TRow getTvfInfo(String jobName) { StreamingInsertTask runningTask = streamingInsertJob.getRunningStreamTask(); + if (runningTask == null) { + return null; + } TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getTaskId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getJobId()))); @@ -117,6 +120,6 @@ public class StreamingJobSchedulerTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser())); } trow.addToColumnValue(new TCell().setStringVal(runningTask.getOffset().toJson())); - return null; + return trow; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index 564327ed0cc..49a272c664f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -17,12 +17,19 @@ package org.apache.doris.job.offset; +import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; /** * Interface for managing offsets and metadata of a data source. */ public interface SourceOffsetProvider { + + /** + * init + */ + void init(String executeSql, StreamingJobProperties jobProperties); + /** * Get source type, e.g. s3, kafka * @return @@ -43,10 +50,10 @@ public interface SourceOffsetProvider { /** * Rewrite the TVF parameters in the SQL based on the current offset. - * @param sql + * @param nextOffset * @return rewritten InsertIntoTableCommand */ - InsertIntoTableCommand rewriteTvfParams(String sql); + InsertIntoTableCommand rewriteTvfParams(Offset nextOffset); /** * Update the offset of the source. @@ -64,5 +71,6 @@ public interface SourceOffsetProvider { * @return */ boolean hasMoreDataToConsume(); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java index 9cefa4e9d42..adc49249eb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java @@ -35,7 +35,7 @@ public class SourceOffsetProviderFactory { public static SourceOffsetProvider createSourceOffsetProvider(String sourceType) { try { - Class<? extends SourceOffsetProvider> cla = map.get(sourceType.toUpperCase()); + Class<? extends SourceOffsetProvider> cla = map.get(sourceType.toLowerCase()); if (cla == null) { throw new JobException("Unsupported source type: " + sourceType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java index 57f89b99505..95271ba60ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -21,13 +21,15 @@ import org.apache.doris.job.offset.Offset; import org.apache.doris.persist.gson.GsonUtils; import lombok.Getter; +import lombok.Setter; import java.util.List; +@Getter +@Setter public class S3Offset implements Offset { String startFile; String endFile; - @Getter List<String> fileLists; @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index e52d9995051..1c7b3839e9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -17,17 +17,50 @@ package org.apache.doris.job.offset.s3; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.job.offset.Offset; import org.apache.doris.job.offset.SourceOffsetProvider; +import org.apache.doris.nereids.analyzer.UnboundTVFRelation; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import lombok.extern.log4j.Log4j2; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +@Log4j2 public class S3SourceOffsetProvider implements SourceOffsetProvider { + String executeSql; S3Offset currentOffset; String maxRemoteEndFile; + StreamingJobProperties jobProperties; + NereidsParser parser; + String filePath; + StorageProperties storageProperties; + + @Override + public void init(String executeSql, StreamingJobProperties jobProperties) { + this.executeSql = executeSql; + this.jobProperties = jobProperties; + this.parser = new NereidsParser(); + InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(executeSql); + UnboundTVFRelation firstTVF = command.getFirstTVF(); + Map<String, String> properties = firstTVF.getProperties().getMap(); + try { + this.storageProperties = StorageProperties.createPrimary(properties); + String uri = storageProperties.validateAndGetUri(properties); + this.filePath = storageProperties.validateAndNormalizeUri(uri); + } catch (UserException e) { + throw new RuntimeException("Failed check storage props, " + e.getMessage(), e); + } + } @Override public String getSourceType() { @@ -36,8 +69,19 @@ public class S3SourceOffsetProvider implements SourceOffsetProvider { @Override public S3Offset getNextOffset() { - //todo: listObjects from end file - return null; + S3Offset offset = new S3Offset(); + List<String> rfiles = new ArrayList<>(); + try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) { + maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, currentOffset.endFile, + jobProperties.getS3BatchFiles(), jobProperties.getS3BatchSize()); + offset.setStartFile(currentOffset.endFile); + offset.setEndFile(rfiles.get(rfiles.size() - 1)); + offset.setFileLists(rfiles); + } catch (Exception e) { + log.warn("list path exception, path={}", filePath, e); + throw new RuntimeException(e); + } + return offset; } @Override @@ -46,15 +90,15 @@ public class S3SourceOffsetProvider implements SourceOffsetProvider { } @Override - public InsertIntoTableCommand rewriteTvfParams(String sql) { - S3Offset nextOffset = getNextOffset(); + public InsertIntoTableCommand rewriteTvfParams(Offset nextOffset) { + S3Offset offset = (S3Offset) nextOffset; Map<String, String> props = new HashMap<>(); //todo: need to change file list to glob string - props.put("uri", nextOffset.getFileLists().toString()); + props.put("uri", offset.getFileLists().toString()); + + InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(executeSql); - NereidsParser parser = new NereidsParser(); - InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(sql); - command.rewriteTvfProperties(getSourceType(), props); + command.rewriteFirstTvfProperties(getSourceType(), props); return command; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java index 4334526630e..61a8eb0a5c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -129,6 +129,7 @@ public class CreateJobInfo { if (streamingJob) { executeType = JobExecuteType.STREAMING; properties = new StreamingJobProperties(jobProperties); + properties.validate(); } jobExecutionConfiguration.setExecuteType(executeType); @@ -273,15 +274,21 @@ public class CreateJobInfo { NereidsParser parser = new NereidsParser(); LogicalPlan logicalPlan = parser.parseSingle(sql); if (logicalPlan instanceof InsertIntoTableCommand) { - return new StreamingInsertJob(labelNameOptional.get(), - JobStatus.PENDING, - currentDbName, - comment, - ConnectContext.get().getCurrentUserIdentity(), - jobExecutionConfiguration, - System.currentTimeMillis(), - sql, - (StreamingJobProperties) properties); + // InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan; + try { + // insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false); + return new StreamingInsertJob(labelNameOptional.get(), + JobStatus.PENDING, + currentDbName, + comment, + ConnectContext.get().getCurrentUserIdentity(), + jobExecutionConfiguration, + System.currentTimeMillis(), + sql, + (StreamingJobProperties) properties); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); + } } else { throw new AnalysisException("Not support this sql : " + sql + " Command class is " + logicalPlan.getClass().getName() + "."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index a5a428c7815..8bf31d941df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -83,6 +83,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -539,46 +540,54 @@ public class InsertIntoTableCommand extends Command implements NeedAuditEncrypti } // todo: add ut - public String getFirstTvfName() { + public UnboundTVFRelation getFirstTVF() { return getFirstTvfInPlan(getLogicalQuery()); } - private String getFirstTvfInPlan(LogicalPlan plan) { + private UnboundTVFRelation getFirstTvfInPlan(LogicalPlan plan) { if (plan instanceof UnboundTVFRelation) { UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan; - return tvfRelation.getFunctionName(); + return tvfRelation; } for (Plan child : plan.children()) { if (child instanceof LogicalPlan) { - String result = getFirstTvfInPlan((LogicalPlan) child); - if (!result.isEmpty()) { + UnboundTVFRelation result = getFirstTvfInPlan((LogicalPlan) child); + if (result != null) { return result; } } } - return ""; + return null; } // todo: add ut - public void rewriteTvfProperties(String functionName, Map<String, String> props) { - rewriteTvfInPlan(originLogicalQuery, functionName, props); - if (logicalQuery.isPresent()) { - rewriteTvfInPlan(logicalQuery.get(), functionName, props); + public void rewriteFirstTvfProperties(String functionName, Map<String, String> props) { + AtomicBoolean found = new AtomicBoolean(false); + rewriteFirstTvfInPlan(originLogicalQuery, functionName, props, found); + if (logicalQuery.isPresent() && !found.get()) { + rewriteFirstTvfInPlan(logicalQuery.get(), functionName, props, found); } } - private void rewriteTvfInPlan(LogicalPlan plan, String functionName, Map<String, String> props) { + private void rewriteFirstTvfInPlan(LogicalPlan plan, + String functionName, Map<String, String> props, AtomicBoolean found) { + if (found.get()) { + return; + } + if (plan instanceof UnboundTVFRelation) { UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan; if (functionName.equalsIgnoreCase(tvfRelation.getFunctionName())) { tvfRelation.getProperties().getMap().putAll(props); + found.set(true); + return; } } for (Plan child : plan.children()) { if (child instanceof LogicalPlan) { - rewriteTvfInPlan((LogicalPlan) child, functionName, props); + rewriteFirstTvfInPlan((LogicalPlan) child, functionName, props, found); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 07032432ace..af79329f681 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -182,6 +182,7 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.fs.remote.dfs.JFSFileSystem; import org.apache.doris.fs.remote.dfs.OFSFileSystem; import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.load.loadv2.BrokerLoadJob; import org.apache.doris.load.loadv2.BulkLoadJob; @@ -447,7 +448,8 @@ public class GsonUtils { jobExecutorRuntimeTypeAdapterFactory = RuntimeTypeAdapterFactory.of(org.apache.doris.job.base.AbstractJob.class, "clazz") .registerSubtype(InsertJob.class, InsertJob.class.getSimpleName()) - .registerSubtype(MTMVJob.class, MTMVJob.class.getSimpleName()); + .registerSubtype(MTMVJob.class, MTMVJob.class.getSimpleName()) + .registerSubtype(StreamingInsertJob.class, StreamingInsertJob.class.getSimpleName()); private static RuntimeTypeAdapterFactory<MTMVSnapshotIf> mtmvSnapshotTypeAdapterFactory = RuntimeTypeAdapterFactory.of(MTMVSnapshotIf.class, "clazz") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org