github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3432548995
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -129,12 +130,25 @@ public Object compareOffset(@RequestBody
CompareOffsetRequest compareOffsetReque
/** Close job */
@RequestMapping(path = "/api/close", method = RequestMethod.POST)
public Object close(@RequestBody JobBaseConfig jobConfig) {
- LOG.info("Closing job {}", jobConfig.getJobId());
+ String jobId = jobConfig.getJobId();
+ LOG.info("Closing job {}", jobId);
Env env = Env.getCurrentEnv();
- SourceReader reader = env.getReader(jobConfig);
- reader.close(jobConfig);
- env.close(jobConfig.getJobId());
- pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());
+ // Don't rebuild a reader to close it; an absent reader (owner BE
gone) just needs its slot
+ // dropped.
+ SourceReader reader = env.getReaderIfPresent(jobId);
+ try {
+ if (reader != null) {
+ reader.release(jobConfig);
+ }
+ SourceReader dropper = reader != null ? reader :
env.getMetaReader(jobConfig);
+ env.releaseSourceResourcesOrRetry(dropper, jobConfig);
+ } catch (Exception ex) {
+ LOG.warn("Close job {} teardown failed: {}", jobId,
ex.getMessage());
+ env.scheduleSlotDrop(jobConfig);
+ } finally {
+ env.close(jobId);
Review Comment:
This removes the local context even when `reader.release(jobConfig)` failed
above. A concrete path is `/api/close` on the BE that owns a PG reader:
`release()` enters `finishSplitRecords()`, a fetcher close throws before the
stream reader is stopped, the catch only schedules a slot drop, and this
finally block untracks the still-live reader. The background retry uses a new
throwaway reader, so the slot remains active in this same cdc_client and every
drop attempt fails until the retry window gives up, after FE has already
deleted the job metadata. Please keep the context, or otherwise
deterministically stop/close the reader, when local release fails and only
untrack after the engine is known stopped.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +263,143 @@ public void close(String jobId) {
}
}
+ /** Liveness evidence (FE heartbeat or active poll): keep this job's
reader alive. */
+ public void keepAlive(String jobId) {
+ JobContext context = jobContexts.get(jobId);
+ if (context != null) {
+ context.lastAliveTime = System.currentTimeMillis();
+ }
+ }
+
+ // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 =
untracked (e.g. TVF),
+ // skip.
+ private void releaseIdleReaders() {
+ long now = System.currentTimeMillis();
+ for (String jobId : jobContexts.keySet()) {
+ Lock lock = jobLocks.get(jobId);
+ if (lock == null || !lock.tryLock()) {
+ continue;
+ }
+ SourceReader toRelease = null;
+ JobBaseConfig releaseConfig = null;
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null || context.lastAliveTime <= 0 ||
context.maxIntervalMs <= 0) {
+ continue;
+ }
+ long timeout =
+ Math.max(
+ (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+ * context.maxIntervalMs,
+ Constants.IDLE_READER_MIN_TIMEOUT_MS);
+ if (now - context.lastAliveTime <= timeout) {
+ continue;
+ }
+ LOG.info(
+ "Releasing idle reader for job {}, idle {} ms, keep
slot",
+ jobId,
+ now - context.lastAliveTime);
+ jobContexts.remove(jobId);
+ toRelease = context.reader;
+ releaseConfig = context.jobConfig;
+ } finally {
+ lock.unlock();
+ }
+ // Release outside the lock so blocking IO never stalls
getReaderAndClaim/detach.
+ if (toRelease != null && releaseConfig != null) {
+ try {
+ toRelease.release(releaseConfig);
+ } catch (Exception ex) {
+ LOG.warn("Failed to release idle reader for job {}",
jobId, ex);
+ }
+ }
+ }
+ }
+
+ // Each chore is guarded independently: one failing must not skip the
other, and an uncaught
+ // throwable here would silently cancel the whole periodic task.
+ private void runBackgroundCleanup() {
+ try {
+ releaseIdleReaders();
+ } catch (Exception e) {
+ LOG.warn("releaseIdleReaders failed", e);
+ }
+ try {
+ retryPendingSlotDrops();
+ } catch (Exception e) {
+ LOG.warn("retryPendingSlotDrops failed", e);
+ }
+ }
+
+ /**
+ * Run source-side cleanup; if incomplete (e.g. slot still held by a dead
BE), retry in
+ * background.
+ */
+ public void releaseSourceResourcesOrRetry(SourceReader reader,
JobBaseConfig jobConfig) {
+ if (!releaseSourceResources(reader, jobConfig)) {
+ scheduleSlotDrop(jobConfig);
+ }
+ }
+
+ public void scheduleSlotDrop(JobBaseConfig jobConfig) {
+ long deadline = System.currentTimeMillis() +
Constants.SLOT_DROP_RETRY_WINDOW_MS;
+ pendingSlotDrops.putIfAbsent(jobConfig.getJobId(), new
SlotDropTask(jobConfig, deadline));
Review Comment:
`pendingSlotDrops` is the only retry state after FE has already removed the
job in `cleanMeta`, but it is only in this cdc_client heap. If DROP JOB routes
`/api/close` to an alive BE while the Doris-owned PG slot is still active on a
dead BE, `releaseSourceResources()` returns false and this schedules retry; if
this cdc_client/BE restarts before PG frees the stale walsender, the map is
empty on restart and no FE job metadata remains to issue another close, leaving
the slot/publication behind permanently. Please make the cleanup retry
FE-driven/durable, or fail/retry DROP JOB until the source resources are
actually gone.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]