NSAmelchev commented on code in PR #10766:
URL: https://github.com/apache/ignite/pull/10766#discussion_r1256004639


##########
modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java:
##########
@@ -102,69 +115,138 @@ protected 
CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
             try {
                 lock.tryLock(1);
 
-                try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) {
-                    Set<File> delete = new HashSet<>();
+                Long lostSgmnt = findLastLostSegement();
 
-                    AtomicLong lastSgmnt = new AtomicLong(-1);
+                if (lostSgmnt != null)
+                    deleteAll(lostSgmnt);
 
-                    cdcFiles
-                        .filter(p -> 
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
-                        
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)
-                            .reversed()) // Sort by segment index.
-                        .forEach(path -> {
-                            long idx = 
FileWriteAheadLogManager.segmentIndex(path);
+                Long cdcDisableSgmnt = findLastCdcDisableSegment();
 
-                            if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx 
== 1) {
-                                lastSgmnt.set(idx);
+                if (cdcDisableSgmnt != null)
+                    deleteAll(cdcDisableSgmnt);
 
-                                return;
-                            }
+                if (lostSgmnt != null || cdcDisableSgmnt != null)
+                    resetWalState();
+                else {

Review Comment:
   Fixed



##########
modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java:
##########
@@ -102,69 +115,138 @@ protected 
CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
             try {
                 lock.tryLock(1);
 
-                try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) {
-                    Set<File> delete = new HashSet<>();
+                Long lostSgmnt = findLastLostSegement();
 
-                    AtomicLong lastSgmnt = new AtomicLong(-1);
+                if (lostSgmnt != null)
+                    deleteAll(lostSgmnt);
 
-                    cdcFiles
-                        .filter(p -> 
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
-                        
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)
-                            .reversed()) // Sort by segment index.
-                        .forEach(path -> {
-                            long idx = 
FileWriteAheadLogManager.segmentIndex(path);
+                Long cdcDisableSgmnt = findLastCdcDisableSegment();
 
-                            if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx 
== 1) {
-                                lastSgmnt.set(idx);
+                if (cdcDisableSgmnt != null)
+                    deleteAll(cdcDisableSgmnt);
 
-                                return;
-                            }
+                if (lostSgmnt != null || cdcDisableSgmnt != null)
+                    resetWalState();
+                else {
+                    if (log.isInfoEnabled())
+                        log.info("Lost segment CDC links or CDC disable record 
were not found.");
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException("Failed to delete lost segment CDC 
links. " +
+                    "Unable to acquire lock to lock CDC folder. Make sure a 
CDC app is shut down " +
+                    "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + 
e.getMessage() + ']');
+            }
+            finally {
+                U.closeQuiet(lock);
+            }
+
+            return null;
+        }
 
-                            delete.add(path.toFile());
-                        });
+        /** @return The index of the segment previous to the last gap or 
{@code null} if no gaps were found. */
+        private Long findLastLostSegement() {
+            AtomicReference<Long> lastLostSgmnt = new AtomicReference<>();
+            AtomicLong lastSgmnt = new AtomicLong(-1);
 
-                    if (delete.isEmpty()) {
-                        log.info("Lost segment CDC links were not found.");
+            consumeCdcSegments(segment -> {
+                if (lastLostSgmnt.get() != null)
+                    return;
 
-                        return null;
-                    }
+                long idx = FileWriteAheadLogManager.segmentIndex(segment);
 
-                    log.info("Found lost segment CDC links. The following 
links will be deleted: " + delete);
+                if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) {
+                    lastSgmnt.set(idx);
+
+                    return;
+                }
+
+                if (log.isInfoEnabled())
+                    log.info("Found lost segment CDC links [lastLostSgmntIdx=" 
+ idx + ']');
+
+                lastLostSgmnt.set(idx);
+            });
+
+            return lastLostSgmnt.get();
+        }
 
-                    delete.forEach(file -> {
-                        if (!file.delete()) {
-                            throw new IgniteException("Failed to delete lost 
segment CDC link [file=" +
-                                file.getAbsolutePath() + ']');
-                        }
+        /** @return The index of the segment that contains the last {@link 
CdcDisableRecord}. */
+        private Long findLastCdcDisableSegment() {
+            AtomicReference<Long> lastRec = new AtomicReference<>();
 
-                        log.info("Segment CDC link deleted [file=" + 
file.getAbsolutePath() + ']');
-                    });
+            consumeCdcSegments(segment -> {
+                if (lastRec.get() != null)
+                    return;
 
-                    Path stateDir = walCdcDir.toPath().resolve(STATE_DIR);
+                if (log.isInfoEnabled())
+                    log.info("Processing CDC segment [segment=" + segment + 
']');
 
-                    if (stateDir.toFile().exists()) {
-                        File walState = 
stateDir.resolve(WAL_STATE_FILE_NAME).toFile();
+                IgniteWalIteratorFactory.IteratorParametersBuilder builder =
+                    new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                        .log(log)
+                        .sharedContext(ignite.context().cache().context())
+                        .filesOrDirs(segment.toFile())
+                        .addFilter((type, ptr) -> type == CDC_DISABLE);
 
-                        if (walState.exists() && !walState.delete()) {
-                            throw new IgniteException("Failed to delete wal 
state file [file=" +
-                                walState.getAbsolutePath() + ']');
-                        }
+                if 
(ignite.configuration().getDataStorageConfiguration().getPageSize() != 0)
+                    
builder.pageSize(ignite.configuration().getDataStorageConfiguration().getPageSize());
+
+                try (WALIterator it = new 
IgniteWalIteratorFactory(log).iterator(builder)) {
+                    if (it.hasNext()) {
+                        if (log.isInfoEnabled())
+                            log.info("Found CDC disable record [ptr=" + 
it.next().get1() + ']');
+
+                        
lastRec.set(FileWriteAheadLogManager.segmentIndex(segment));
                     }
                 }
-                catch (IOException e) {
-                    throw new RuntimeException("Failed to delete lost segment 
CDC links.", e);
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException("Failed to read CDC segment 
[path=" + segment + ']', e);
                 }
+            });
+
+            return lastRec.get();
+        }
+
+        /** */
+        private void resetWalState() {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to