github-actions[bot] commented on code in PR #64728:
URL: https://github.com/apache/doris/pull/64728#discussion_r3459078826
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -635,21 +617,23 @@ public String getPersistInfo() {
// ============ Async split progress (driven by scheduler each tick)
============
/**
- * One-time setup at CREATE.
- * - initial/snapshot mode: init split progress; scheduler will drive
advanceSplits() each tick.
- * - latest mode (and other non-splitting modes): open the remote reader
(e.g. PG slot) so the
- * binlog phase can start immediately; no snapshot splitting will happen.
+ * One-time setup at CREATE. Opens the remote reader for every mode (see
initSourceReader) so
+ * source-side problems fail fast, then:
+ * - initial/snapshot mode: also init split progress; scheduler will drive
advanceSplits() each tick.
+ * - latest mode (and other non-splitting modes): nothing further; the
binlog phase can start
+ * immediately and no snapshot splitting will happen.
*/
@Override
public void initOnCreate(List<String> syncTables) throws JobException {
- if (!checkNeedSplitChunks(sourceProperties)) {
- initSourceReader();
- return;
- }
- synchronized (splitsLock) {
- this.cachedSyncTables = syncTables;
- this.committedSplitProgress = new SplitProgress();
- this.cdcSplitProgress = new SplitProgress();
Review Comment:
This creates a BE-local CDC reader during CREATE on whichever backend
`selectBackend()` returns, but the provider never records that backend. The
later split fetch/write paths and `cleanMeta()` all call `selectBackend()`
again, so in a multi-BE cluster `initOnCreate()` can initialize BE A, the job
can run on BE B, and DROP/cleanup can send `/api/close` to BE C. The CDC client
keeps `Env.jobContexts` per BE, and `/api/close` only closes the context on the
addressed BE, so the CREATE-time reader on BE A is left behind with its
executor/config. Before this change initial/snapshot jobs did not create that
extra BE-local reader at CREATE time. Please either pin/record the backend
chosen here and use it for close/release, close all possible BE-side contexts,
or use a validation endpoint that does not create a persistent reader context.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -864,17 +848,35 @@ protected List<SnapshotSplit> rpcFetchSplitsBatch(String
table, Object[] nextSpl
if (code != TStatusCode.OK) {
throw new JobException("fetchSplits backend error: " +
result.getStatus().getErrorMsgs(0));
}
- ResponseBody<List<SnapshotSplit>> resp = objectMapper.readValue(
- result.getResponse(),
- new TypeReference<ResponseBody<List<SnapshotSplit>>>() {});
- return resp.getData();
+ return parseCdcResponseData(
+ result.getResponse(), new
TypeReference<List<SnapshotSplit>>() {});
} catch (TimeoutException te) {
throw new JobException("fetchSplits RPC timeout: jobId=" +
getJobId() + " table=" + table);
} catch (Exception ex) {
throw new JobException("fetchSplits failed: " + ex.getMessage());
}
}
+ /**
+ * Decode a remote response envelope. A failure is returned as {@code
{code:1, data:"<message>"}}
+ * over HTTP 200, while success carries the typed payload in {@code data}.
Decode the envelope
+ * with a lenient {@link JsonNode} data field first so a failure throws
the raw response (which
+ * carries the real error in {@code data}) instead of a misleading
type-mismatch from forcing the
+ * success type onto an error string. Package-private for unit testing.
+ */
+ <T> T parseCdcResponseData(String response, TypeReference<T> dataType)
throws JobException {
+ ResponseBody<JsonNode> body;
+ try {
+ body = objectMapper.readValue(response, new
TypeReference<ResponseBody<JsonNode>>() {});
+ } catch (JsonProcessingException e) {
+ throw new JobException(response);
Review Comment:
`convertValue` is outside the normalization boundary. If the CDC client
returns a successful envelope with an unexpected data shape, e.g.
`{'code':0,'data':'not-a-map'}` for `fetchEndOffset`, Jackson throws an
unchecked `IllegalArgumentException` here instead of the helper's
`JobException` with the raw response. That reintroduces the misleading
parse-error path this helper is meant to avoid for
`fetchRemoteMeta`/`compareOffset`. Please catch `IllegalArgumentException`
around the conversion, wrap it as `JobException` with the raw response or parse
context, and add a unit test for `code=0` with incompatible `data`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]