github-actions[bot] commented on code in PR #61826:
URL: https://github.com/apache/doris/pull/61826#discussion_r3014811970
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1016,14 +1031,25 @@ public void beforeCommitted(TransactionState txnState)
throws TransactionExcepti
}
LoadJob loadJob = loadJobs.get(0);
LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+
+ String offsetJson = offsetProvider.getCommitOffsetJson(
+ runningStreamTask.getRunningOffset(),
+ runningStreamTask.getTaskId(),
Review Comment:
**Critical: RPC under write lock**
`getCommitOffsetJson()` makes a BRPC call to BE (via `fetchTaskEndOffset` →
`future.get()`) while the job's write lock is held (acquired at line 1016). Per
`fe/fe-core/AGENTS.md`: *"Metadata-locking paths avoid RPC, external IO, and
journal waits while holding catalog/database/table locks."*
If the target BE is slow, unreachable, or the BRPC times out, this thread
will block indefinitely (there's no timeout on the `future.get()`) while
holding the write lock, potentially stalling all concurrent operations on this
job.
**Recommendation:** Fetch the offset BRPC result *before* acquiring the
write lock, then take the lock only for the mutation/attachment logic.
Alternatively, add a bounded timeout to `future.get()` and propagate the
timeout as a `TransactionException`.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:
##########
@@ -0,0 +1,425 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ * <li>offset commit: FE pulls actual end offset from BE via
/api/getTaskOffset/{taskId} in
+ * beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ * <li>cloud mode snapshot: attachment carries cumulative
chunkHighWatermarkMap so that
+ * replayOnCloudMode can recover full state from the single latest
attachment in MS</li>
+ * <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap
populated by
+ * replayOnCommitted/replayOnCloudMode -> updateOffset), not from
EditLog</li>
+ * <li>updateOffset: during replay remainingSplits is empty so removeIf
returns false naturally;
+ * chunkHighWatermarkMap is always updated unconditionally to support
recovery</li>
+ * <li>replayIfNeed: checks currentOffset directly — snapshot triggers
remainingSplits rebuild
+ * from meta + chunkHighWatermarkMap; binlog needs no action
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * No-arg constructor required by
SourceOffsetProviderFactory.createSourceOffsetProvider().
+ */
+ public JdbcTvfSourceOffsetProvider() {
+ super();
+ }
+
+ /**
+ * Initializes provider state and fetches snapshot splits from BE.
+ * splitChunks is called here (rather than in StreamingInsertJob) to keep
+ * all cdc_stream-specific init logic inside the provider.
+ */
+ @Override
+ public void ensureInitialized(Long jobId, Map<String, String>
originTvfProps) throws JobException {
+ // Always refresh fields that may be updated via ALTER JOB (e.g.
credentials, parallelism).
+ this.sourceProperties = originTvfProps;
+ this.snapshotParallelism = Integer.parseInt(
+
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+ DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+
+ if (this.jobId != null) {
+ return;
+ }
+ // One-time initialization below — safe to skip on FE restart because
the provider
+ // is reconstructed fresh (getPersistInfo returns null), so jobId is
null then too.
+ this.jobId = jobId;
+ this.chunkHighWatermarkMap = new HashMap<>();
+ String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
+ Preconditions.checkArgument(type != null, "type is required");
+ this.sourceType = DataSourceType.valueOf(type.toUpperCase());
+ String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
+ Preconditions.checkArgument(table != null, "table is required for
cdc_stream TVF");
+ }
+
+ /**
+ * Called once on fresh job creation (not on FE restart).
+ * Fetches snapshot splits from BE and persists them to the meta table.
+ */
+ @Override
+ public void initOnCreate() throws JobException {
+ String table = sourceProperties.get(DataSourceConfigKeys.TABLE);
+ splitChunks(Collections.singletonList(table));
+ }
+
+ /**
+ * Rewrites the cdc_stream TVF SQL with current offset meta and taskId,
+ * so the BE knows where to start reading and can report
+ * the end offset back via taskOffsetCache.
+ */
+ @Override
+ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand,
+ Offset runningOffset, long taskId) {
+ JdbcOffset offset = (JdbcOffset) runningOffset;
+ Map<String, String> props = new HashMap<>();
+ Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan
-> {
+ if (plan instanceof UnboundTVFRelation) {
+ UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
+ props.putAll(originTvfRel.getProperties().getMap());
+ props.put(CdcStreamTableValuedFunction.META_KEY, new
Gson().toJson(offset.generateMeta()));
+ props.put(CdcStreamTableValuedFunction.JOB_ID_KEY,
String.valueOf(jobId));
+ props.put(CdcStreamTableValuedFunction.TASK_ID_KEY,
String.valueOf(taskId));
+ return new UnboundTVFRelation(
+ originTvfRel.getRelationId(),
originTvfRel.getFunctionName(), new Properties(props));
+ }
+ return plan;
+ });
+ InsertIntoTableCommand cmd = new InsertIntoTableCommand((LogicalPlan)
rewritePlan,
+ Optional.empty(), Optional.empty(), Optional.empty(), true,
Optional.empty());
+ cmd.setJobId(originCommand.getJobId());
+ return cmd;
+ }
+
+ /**
+ * Returns the serialized JSON offset to store in txn commit attachment.
+ *
+ * <p>Calls BE /api/getTaskOffset/{taskId} to get the actual end offset
recorded after
+ * fetchRecordStream completes (stored in
PipelineCoordinator.taskOffsetCache).
+ *
+ * <p>For cloud + snapshot: returns cumulative list (all previously
completed chunks +
+ * current task's new splits) so that replayOnCloudMode can recover full
state from latest attachment.
+ * For non-cloud snapshot / binlog: returns only current task's splits.
+ */
+ @Override
+ public String getCommitOffsetJson(Offset runningOffset, long taskId,
List<Long> scanBackendIds) {
+ List<Map<String, String>> currentTaskEndOffset =
fetchTaskEndOffset(taskId, scanBackendIds);
+ if (CollectionUtils.isEmpty(currentTaskEndOffset)) {
+ return "";
+ }
+
+ // Cloud + snapshot: prepend all previously completed chunks so the
attachment is
+ // self-contained for replayOnCloudMode (MS only keeps the latest
attachment)
+ if (Config.isCloudMode() && ((JdbcOffset)
runningOffset).snapshotSplit()) {
+ List<Map<String, String>> cumulative =
buildCumulativeSnapshotOffset(currentTaskEndOffset);
+ return new Gson().toJson(cumulative);
+ }
+ return new Gson().toJson(currentTaskEndOffset);
+ }
+
+ /**
+ * Queries each scan backend in order until one returns a non-empty offset
for this taskId.
+ * Only the BE that ran the cdc_stream TVF scan node will have the entry
in taskOffsetCache.
+ */
+ private List<Map<String, String>> fetchTaskEndOffset(long taskId,
List<Long> scanBackendIds) {
+ InternalService.PRequestCdcClientRequest request =
+ InternalService.PRequestCdcClientRequest.newBuilder()
+ .setApi("/api/getTaskOffset/" + taskId).build();
+ for (Long beId : scanBackendIds) {
+ Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+ if (backend == null) {
+ log.info("Backend {} not found for task {}, skipping", beId,
taskId);
+ continue;
+ }
+ String rawResponse = null;
+ try {
+ TNetworkAddress address = new
TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ Future<PRequestCdcClientResult> future =
+
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+ InternalService.PRequestCdcClientResult result = future.get();
Review Comment:
**High: unbounded `future.get()`**
`future.get()` is called without a timeout. If the BE is unavailable or
slow, this will block the calling FE thread indefinitely. Since the caller
(`beforeCommitted`) holds the job write lock, this compounds into a critical
issue.
Consider using `future.get(timeout, TimeUnit.SECONDS)` with a reasonable
timeout (e.g., 30-60 seconds), and handle `TimeoutException` by logging and
returning an empty result or propagating as a checked exception.
##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -51,24 +62,34 @@ private void processProps(Map<String, String> properties)
throws AnalysisExcepti
Map<String, String> copyProps = new HashMap<>(properties);
copyProps.put("format", "json");
super.parseCommonProperties(copyProps);
- this.processedParams.put("enable_cdc_client", "true");
- this.processedParams.put("uri", URI);
- this.processedParams.put("http.enable.range.request", "false");
- this.processedParams.put("http.enable.chunk.response", "true");
- this.processedParams.put("http.method", "POST");
+ this.processedParams.put(ENABLE_CDC_CLIENT_KEY, "true");
+ this.processedParams.put(URI_KEY, URI);
+ this.processedParams.put(HTTP_ENABLE_RANGE_REQUEST_KEY, "false");
+ this.processedParams.put(HTTP_ENABLE_CHUNK_RESPONSE_KEY, "true");
+ this.processedParams.put(HTTP_METHOD_KEY, "POST");
String payload = generateParams(properties);
- this.processedParams.put("http.payload", payload);
+ this.processedParams.put(HTTP_PAYLOAD_KEY, payload);
this.backendConnectProperties.putAll(processedParams);
generateFileStatus();
}
private String generateParams(Map<String, String> properties) throws
AnalysisException {
FetchRecordRequest recordRequest = new FetchRecordRequest();
- recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+ String defaultJobId = UUID.randomUUID().toString().replace("-", "");
+ recordRequest.setJobId(properties.getOrDefault(JOB_ID_KEY,
defaultJobId));
recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
recordRequest.setConfig(properties);
try {
+ // for tvf with job
+ if (properties.containsKey(TASK_ID_KEY)) {
+ recordRequest.setTaskId(properties.remove(TASK_ID_KEY));
+ String meta = properties.remove(META_KEY);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(meta),
"meta is required when task.id is provided");
+ Map<String, Object> metaMap = objectMapper.readValue(meta, new
TypeReference<Map<String, Object>>() {});
+ recordRequest.setMeta(metaMap);
+ }
Review Comment:
**Low: mutating input map via `properties.remove()`**
These `remove()` calls mutate the `properties` map that was passed in.
Currently this is safe because `rewriteTvfParams` creates a fresh `HashMap` for
the `props` map. However, this assumption is not documented, and a future
caller could pass a shared or unmodifiable map.
Consider either:
1. Adding a comment documenting that the caller must pass a mutable,
caller-owned map, or
2. Using `properties.get()` instead of `remove()` (the keys don't need to be
removed from this map since it's serialized to JSON via
`objectMapper.writeValueAsString`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]