JNSimba commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3432622492
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +263,143 @@ public void close(String jobId) {
}
}
+ /** Liveness evidence (FE heartbeat or active poll): keep this job's
reader alive. */
+ public void keepAlive(String jobId) {
+ JobContext context = jobContexts.get(jobId);
+ if (context != null) {
+ context.lastAliveTime = System.currentTimeMillis();
+ }
+ }
+
+ // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 =
untracked (e.g. TVF),
+ // skip.
+ private void releaseIdleReaders() {
+ long now = System.currentTimeMillis();
+ for (String jobId : jobContexts.keySet()) {
+ Lock lock = jobLocks.get(jobId);
+ if (lock == null || !lock.tryLock()) {
+ continue;
+ }
+ SourceReader toRelease = null;
+ JobBaseConfig releaseConfig = null;
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null || context.lastAliveTime <= 0 ||
context.maxIntervalMs <= 0) {
+ continue;
+ }
+ long timeout =
+ Math.max(
+ (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+ * context.maxIntervalMs,
+ Constants.IDLE_READER_MIN_TIMEOUT_MS);
+ if (now - context.lastAliveTime <= timeout) {
+ continue;
+ }
+ LOG.info(
+ "Releasing idle reader for job {}, idle {} ms, keep
slot",
+ jobId,
+ now - context.lastAliveTime);
+ jobContexts.remove(jobId);
+ toRelease = context.reader;
+ releaseConfig = context.jobConfig;
+ } finally {
+ lock.unlock();
+ }
+ // Release outside the lock so blocking IO never stalls
getReaderAndClaim/detach.
+ if (toRelease != null && releaseConfig != null) {
+ try {
+ toRelease.release(releaseConfig);
+ } catch (Exception ex) {
+ LOG.warn("Failed to release idle reader for job {}",
jobId, ex);
+ }
+ }
+ }
+ }
+
+ // Each chore is guarded independently: one failing must not skip the
other, and an uncaught
+ // throwable here would silently cancel the whole periodic task.
+ private void runBackgroundCleanup() {
+ try {
+ releaseIdleReaders();
+ } catch (Exception e) {
+ LOG.warn("releaseIdleReaders failed", e);
+ }
+ try {
+ retryPendingSlotDrops();
+ } catch (Exception e) {
+ LOG.warn("retryPendingSlotDrops failed", e);
+ }
+ }
+
+ /**
+ * Run source-side cleanup; if incomplete (e.g. slot still held by a dead
BE), retry in
+ * background.
+ */
+ public void releaseSourceResourcesOrRetry(SourceReader reader,
JobBaseConfig jobConfig) {
+ if (!releaseSourceResources(reader, jobConfig)) {
+ scheduleSlotDrop(jobConfig);
+ }
+ }
+
+ public void scheduleSlotDrop(JobBaseConfig jobConfig) {
+ long deadline = System.currentTimeMillis() +
Constants.SLOT_DROP_RETRY_WINDOW_MS;
+ pendingSlotDrops.putIfAbsent(jobConfig.getJobId(), new
SlotDropTask(jobConfig, deadline));
Review Comment:
We will do our utmost to drop slots, without considering the possibility of
a beta restart for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]