NSAmelchev commented on code in PR #10766: URL: https://github.com/apache/ignite/pull/10766#discussion_r1256004639
########## modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java: ########## @@ -102,69 +115,138 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool try { lock.tryLock(1); - try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) { - Set<File> delete = new HashSet<>(); + Long lostSgmnt = findLastLostSegement(); - AtomicLong lastSgmnt = new AtomicLong(-1); + if (lostSgmnt != null) + deleteAll(lostSgmnt); - cdcFiles - .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) - .sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex) - .reversed()) // Sort by segment index. - .forEach(path -> { - long idx = FileWriteAheadLogManager.segmentIndex(path); + Long cdcDisableSgmnt = findLastCdcDisableSegment(); - if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { - lastSgmnt.set(idx); + if (cdcDisableSgmnt != null) + deleteAll(cdcDisableSgmnt); - return; - } + if (lostSgmnt != null || cdcDisableSgmnt != null) + resetWalState(); + else { Review Comment: Fixed ########## modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java: ########## @@ -102,69 +115,138 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool try { lock.tryLock(1); - try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) { - Set<File> delete = new HashSet<>(); + Long lostSgmnt = findLastLostSegement(); - AtomicLong lastSgmnt = new AtomicLong(-1); + if (lostSgmnt != null) + deleteAll(lostSgmnt); - cdcFiles - .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) - .sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex) - .reversed()) // Sort by segment index. - .forEach(path -> { - long idx = FileWriteAheadLogManager.segmentIndex(path); + Long cdcDisableSgmnt = findLastCdcDisableSegment(); - if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { - lastSgmnt.set(idx); + if (cdcDisableSgmnt != null) + deleteAll(cdcDisableSgmnt); - return; - } + if (lostSgmnt != null || cdcDisableSgmnt != null) + resetWalState(); + else { + if (log.isInfoEnabled()) + log.info("Lost segment CDC links or CDC disable record were not found."); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to delete lost segment CDC links. " + + "Unable to acquire lock to lock CDC folder. Make sure a CDC app is shut down " + + "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + e.getMessage() + ']'); + } + finally { + U.closeQuiet(lock); + } + + return null; + } - delete.add(path.toFile()); - }); + /** @return The index of the segment previous to the last gap or {@code null} if no gaps were found. */ + private Long findLastLostSegement() { + AtomicReference<Long> lastLostSgmnt = new AtomicReference<>(); + AtomicLong lastSgmnt = new AtomicLong(-1); - if (delete.isEmpty()) { - log.info("Lost segment CDC links were not found."); + consumeCdcSegments(segment -> { + if (lastLostSgmnt.get() != null) + return; - return null; - } + long idx = FileWriteAheadLogManager.segmentIndex(segment); - log.info("Found lost segment CDC links. The following links will be deleted: " + delete); + if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { + lastSgmnt.set(idx); + + return; + } + + if (log.isInfoEnabled()) + log.info("Found lost segment CDC links [lastLostSgmntIdx=" + idx + ']'); + + lastLostSgmnt.set(idx); + }); + + return lastLostSgmnt.get(); + } - delete.forEach(file -> { - if (!file.delete()) { - throw new IgniteException("Failed to delete lost segment CDC link [file=" + - file.getAbsolutePath() + ']'); - } + /** @return The index of the segment that contains the last {@link CdcDisableRecord}. */ + private Long findLastCdcDisableSegment() { + AtomicReference<Long> lastRec = new AtomicReference<>(); - log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']'); - }); + consumeCdcSegments(segment -> { + if (lastRec.get() != null) + return; - Path stateDir = walCdcDir.toPath().resolve(STATE_DIR); + if (log.isInfoEnabled()) + log.info("Processing CDC segment [segment=" + segment + ']'); - if (stateDir.toFile().exists()) { - File walState = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); + IgniteWalIteratorFactory.IteratorParametersBuilder builder = + new IgniteWalIteratorFactory.IteratorParametersBuilder() + .log(log) + .sharedContext(ignite.context().cache().context()) + .filesOrDirs(segment.toFile()) + .addFilter((type, ptr) -> type == CDC_DISABLE); - if (walState.exists() && !walState.delete()) { - throw new IgniteException("Failed to delete wal state file [file=" + - walState.getAbsolutePath() + ']'); - } + if (ignite.configuration().getDataStorageConfiguration().getPageSize() != 0) + builder.pageSize(ignite.configuration().getDataStorageConfiguration().getPageSize()); + + try (WALIterator it = new IgniteWalIteratorFactory(log).iterator(builder)) { + if (it.hasNext()) { + if (log.isInfoEnabled()) + log.info("Found CDC disable record [ptr=" + it.next().get1() + ']'); + + lastRec.set(FileWriteAheadLogManager.segmentIndex(segment)); } } - catch (IOException e) { - throw new RuntimeException("Failed to delete lost segment CDC links.", e); + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to read CDC segment [path=" + segment + ']', e); } + }); + + return lastRec.get(); + } + + /** */ + private void resetWalState() { Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org