ArafatKhan2198 commented on code in PR #9258: URL: https://github.com/apache/ozone/pull/9258#discussion_r2890150972
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); + Set<Long> chunkContainerIdSet = new HashSet<>(chunkContainerIds); + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState = + healthSchemaManager.getExistingInStateSinceByContainerIds(chunkContainerIds); + List<UnhealthyContainerRecordV2> recordsToInsert = new ArrayList<>(); + ProcessingStats chunkStats = new ProcessingStats(); + Set<Long> negativeSizeRecorded = new HashSet<>(); + + report.forEachContainerByState((state, cid) -> { + if (!chunkContainerIdSet.contains(cid.getId())) { + return; + } + try { + handleScmStateContainer(state, cid, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, + negativeSizeRecorded, chunkStats); + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found when processing {} state", cid, state, e); + } + }); + + int chunkReplicaMismatchCount = processReplicaMismatchContainersForChunk( + report, currentTime, existingInStateSinceByContainerAndState, + recordsToInsert, chunkContainerIdSet); + totalReplicaMismatchCount += chunkReplicaMismatchCount; + totalStats.add(chunkStats); + persistUnhealthyRecords(chunkContainerIds, recordsToInsert); + } + + LOG.info("Stored {} MISSING, {} EMPTY_MISSING, {} UNDER_REPLICATED, " + + "{} OVER_REPLICATED, {} MIS_REPLICATED, {} NEGATIVE_SIZE, " + + "{} REPLICA_MISMATCH", + totalStats.missingCount, totalStats.emptyMissingCount, totalStats.underRepCount, + totalStats.overRepCount, totalStats.misRepCount, totalStats.negativeSizeCount, + totalReplicaMismatchCount); + } + + private void handleScmStateContainer( + ContainerHealthState state, + ContainerID containerId, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> negativeSizeRecorded, + ProcessingStats stats) throws ContainerNotFoundException { + switch (state) { + case MISSING: Review Comment: SCM's handler chain can emit composite health states like: ``` UNHEALTHY_UNDER_REPLICATED QUASI_CLOSED_STUCK_MISSING QUASI_CLOSED_STUCK_UNDER_REPLICATED MISSING_UNDER_REPLICATED ``` etc. But the switch statement in handleScmStateContainer() only handles 4 states: MISSING, UNDER_REPLICATED, OVER_REPLICATED, MIS_REPLICATED. Everything else falls into default: break; and is silently thrown away. This means a container that is both quasi-closed-stuck AND has no replicas (QUASI_CLOSED_STUCK_MISSING) will never appear in the Recon UI or the UNHEALTHY_CONTAINERS table. Are these composite states intentionally excluded from V2? or should we map them to their base state (e.g., QUASI_CLOSED_STUCK_UNDER_REPLICATED → store as UNDER_REPLICATED with an appropriate reason string). ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); + Set<Long> chunkContainerIdSet = new HashSet<>(chunkContainerIds); + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState = + healthSchemaManager.getExistingInStateSinceByContainerIds(chunkContainerIds); + List<UnhealthyContainerRecordV2> recordsToInsert = new ArrayList<>(); + ProcessingStats chunkStats = new ProcessingStats(); + Set<Long> negativeSizeRecorded = new HashSet<>(); + + report.forEachContainerByState((state, cid) -> { + if (!chunkContainerIdSet.contains(cid.getId())) { + return; + } + try { + handleScmStateContainer(state, cid, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, + negativeSizeRecorded, chunkStats); + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found when processing {} state", cid, state, e); + } + }); + + int chunkReplicaMismatchCount = processReplicaMismatchContainersForChunk( + report, currentTime, existingInStateSinceByContainerAndState, + recordsToInsert, chunkContainerIdSet); + totalReplicaMismatchCount += chunkReplicaMismatchCount; + totalStats.add(chunkStats); + persistUnhealthyRecords(chunkContainerIds, recordsToInsert); + } + + LOG.info("Stored {} MISSING, {} EMPTY_MISSING, {} UNDER_REPLICATED, " + + "{} OVER_REPLICATED, {} MIS_REPLICATED, {} NEGATIVE_SIZE, " + + "{} REPLICA_MISMATCH", + totalStats.missingCount, totalStats.emptyMissingCount, totalStats.underRepCount, + totalStats.overRepCount, totalStats.misRepCount, totalStats.negativeSizeCount, + totalReplicaMismatchCount); + } + + private void handleScmStateContainer( + ContainerHealthState state, + ContainerID containerId, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> negativeSizeRecorded, + ProcessingStats stats) throws ContainerNotFoundException { + switch (state) { + case MISSING: + handleMissingContainer(containerId, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, stats); + break; + case UNDER_REPLICATED: Review Comment: Shouldn't we have a state for `REPLICA_MISMATCH` also ? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); + Set<Long> chunkContainerIdSet = new HashSet<>(chunkContainerIds); + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState = + healthSchemaManager.getExistingInStateSinceByContainerIds(chunkContainerIds); + List<UnhealthyContainerRecordV2> recordsToInsert = new ArrayList<>(); + ProcessingStats chunkStats = new ProcessingStats(); + Set<Long> negativeSizeRecorded = new HashSet<>(); + + report.forEachContainerByState((state, cid) -> { + if (!chunkContainerIdSet.contains(cid.getId())) { + return; + } + try { + handleScmStateContainer(state, cid, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, + negativeSizeRecorded, chunkStats); + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found when processing {} state", cid, state, e); + } + }); + + int chunkReplicaMismatchCount = processReplicaMismatchContainersForChunk( + report, currentTime, existingInStateSinceByContainerAndState, + recordsToInsert, chunkContainerIdSet); + totalReplicaMismatchCount += chunkReplicaMismatchCount; + totalStats.add(chunkStats); + persistUnhealthyRecords(chunkContainerIds, recordsToInsert); + } + + LOG.info("Stored {} MISSING, {} EMPTY_MISSING, {} UNDER_REPLICATED, " + + "{} OVER_REPLICATED, {} MIS_REPLICATED, {} NEGATIVE_SIZE, " + + "{} REPLICA_MISMATCH", + totalStats.missingCount, totalStats.emptyMissingCount, totalStats.underRepCount, + totalStats.overRepCount, totalStats.misRepCount, totalStats.negativeSizeCount, + totalReplicaMismatchCount); + } + + private void handleScmStateContainer( + ContainerHealthState state, + ContainerID containerId, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> negativeSizeRecorded, + ProcessingStats stats) throws ContainerNotFoundException { + switch (state) { Review Comment: Remember, in our offline discussion we tried checking what happens if we attempt to add a state to the Derby table that violates the allowed-state constraint. I believe this switch case will prevent a new state from being added to the database. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); Review Comment: Minor Suggestion - ``` processContainer(container, nullQueue, report, true); // calls getContainerReplicas() internally Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); // calls again ``` Every container's replicas are fetched twice once inside the inherited processContainer() and once for the `REPLICA_MISMATCH` check. On a 1 million container cluster, that's 2M replica lookups. We can Extract replicas once and pass them to both operations. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); + Set<Long> chunkContainerIdSet = new HashSet<>(chunkContainerIds); + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState = + healthSchemaManager.getExistingInStateSinceByContainerIds(chunkContainerIds); + List<UnhealthyContainerRecordV2> recordsToInsert = new ArrayList<>(); + ProcessingStats chunkStats = new ProcessingStats(); + Set<Long> negativeSizeRecorded = new HashSet<>(); + + report.forEachContainerByState((state, cid) -> { + if (!chunkContainerIdSet.contains(cid.getId())) { + return; + } + try { + handleScmStateContainer(state, cid, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, + negativeSizeRecorded, chunkStats); + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found when processing {} state", cid, state, e); + } + }); + + int chunkReplicaMismatchCount = processReplicaMismatchContainersForChunk( + report, currentTime, existingInStateSinceByContainerAndState, + recordsToInsert, chunkContainerIdSet); + totalReplicaMismatchCount += chunkReplicaMismatchCount; + totalStats.add(chunkStats); + persistUnhealthyRecords(chunkContainerIds, recordsToInsert); + } + + LOG.info("Stored {} MISSING, {} EMPTY_MISSING, {} UNDER_REPLICATED, " + + "{} OVER_REPLICATED, {} MIS_REPLICATED, {} NEGATIVE_SIZE, " + + "{} REPLICA_MISMATCH", + totalStats.missingCount, totalStats.emptyMissingCount, totalStats.underRepCount, + totalStats.overRepCount, totalStats.misRepCount, totalStats.negativeSizeCount, + totalReplicaMismatchCount); + } + + private void handleScmStateContainer( + ContainerHealthState state, + ContainerID containerId, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> negativeSizeRecorded, + ProcessingStats stats) throws ContainerNotFoundException { + switch (state) { + case MISSING: + handleMissingContainer(containerId, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, stats); + break; + case UNDER_REPLICATED: + stats.incrementUnderRepCount(); + handleReplicaStateContainer(containerId, currentTime, + UnHealthyContainerStates.UNDER_REPLICATED, + existingInStateSinceByContainerAndState, recordsToInsert, negativeSizeRecorded, stats); + break; + case OVER_REPLICATED: + stats.incrementOverRepCount(); + handleReplicaStateContainer(containerId, currentTime, + UnHealthyContainerStates.OVER_REPLICATED, + existingInStateSinceByContainerAndState, recordsToInsert, negativeSizeRecorded, stats); + break; + case MIS_REPLICATED: + stats.incrementMisRepCount(); + handleReplicaStateContainer(containerId, currentTime, + UnHealthyContainerStates.MIS_REPLICATED, + existingInStateSinceByContainerAndState, recordsToInsert, negativeSizeRecorded, stats); + break; + default: + break; + } + } + + private void handleMissingContainer( + ContainerID containerId, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + ProcessingStats stats) throws ContainerNotFoundException { + ContainerInfo container = containerManager.getContainer(containerId); + int expected = container.getReplicationConfig().getRequiredNodes(); + if (isEmptyMissing(container)) { + stats.incrementEmptyMissingCount(); + recordsToInsert.add(createRecord(container, + UnHealthyContainerStates.EMPTY_MISSING, + resolveInStateSince(container.getContainerID(), + UnHealthyContainerStates.EMPTY_MISSING, currentTime, + existingInStateSinceByContainerAndState), + expected, 0, + "Container has no replicas and no keys")); + return; + } + + stats.incrementMissingCount(); + recordsToInsert.add(createRecord(container, + UnHealthyContainerStates.MISSING, + resolveInStateSince(container.getContainerID(), + UnHealthyContainerStates.MISSING, currentTime, + existingInStateSinceByContainerAndState), + expected, 0, + "No replicas available")); + } + + private void handleReplicaStateContainer( + ContainerID containerId, + long currentTime, + UnHealthyContainerStates targetState, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> negativeSizeRecorded, + ProcessingStats stats) throws ContainerNotFoundException { + ContainerInfo container = containerManager.getContainer(containerId); + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(containerId); + int expected = container.getReplicationConfig().getRequiredNodes(); + int actual = replicas.size(); + recordsToInsert.add(createRecord(container, targetState, + resolveInStateSince(container.getContainerID(), targetState, + currentTime, existingInStateSinceByContainerAndState), + expected, actual, reasonForState(targetState))); + addNegativeSizeRecordIfNeeded(container, currentTime, actual, recordsToInsert, + existingInStateSinceByContainerAndState, negativeSizeRecorded, stats); + } + + private int processReplicaMismatchContainersForChunk( + ReconReplicationManagerReport report, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> chunkContainerIdSet) { + List<ContainerID> replicaMismatchContainers = report.getReplicaMismatchContainers(); + int chunkReplicaMismatchCount = 0; + for (ContainerID cid : replicaMismatchContainers) { + if (!chunkContainerIdSet.contains(cid.getId())) { + continue; + } + try { + ContainerInfo container = containerManager.getContainer(cid); + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + int expected = container.getReplicationConfig().getRequiredNodes(); + int actual = replicas.size(); + recordsToInsert.add(createRecord(container, + UnHealthyContainerStates.REPLICA_MISMATCH, + resolveInStateSince(container.getContainerID(), + UnHealthyContainerStates.REPLICA_MISMATCH, currentTime, + existingInStateSinceByContainerAndState), + expected, actual, + "Data checksum mismatch across replicas")); + chunkReplicaMismatchCount++; + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found when processing REPLICA_MISMATCH state", cid, e); + } + } + return chunkReplicaMismatchCount; + } + + private List<Long> collectContainerIds(List<ContainerInfo> allContainers, + int fromInclusive, int toExclusive) { + List<Long> containerIds = new ArrayList<>(toExclusive - fromInclusive); + for (int i = fromInclusive; i < toExclusive; i++) { + containerIds.add(allContainers.get(i).getContainerID()); + } + return containerIds; + } + + private void persistUnhealthyRecords( + List<Long> containerIdsToDelete, + List<UnhealthyContainerRecordV2> recordsToInsert) { + LOG.info("Deleting SCM states for {} containers", containerIdsToDelete.size()); + healthSchemaManager.batchDeleteSCMStatesForContainers(containerIdsToDelete); + + LOG.info("Inserting {} unhealthy container records", recordsToInsert.size()); + healthSchemaManager.insertUnhealthyContainerRecords(recordsToInsert); + } Review Comment: ``` healthSchemaManager.batchDeleteSCMStatesForContainers(containerIdsToDelete); // step 1 healthSchemaManager.insertUnhealthyContainerRecords(recordsToInsert); // step 2 ``` These two calls are not in the same transaction. If Recon crashes between step 1 and step 2, all old health data is gone but the new data was never written. The API would return 0 unhealthy containers until the next scan runs. Also, if someone queries the API between step 1 and step 2, they get empty or partial results. ########## hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java: ########## @@ -80,8 +80,24 @@ private void createUnhealthyContainersTable() { .check(field(name(CONTAINER_STATE)) .in(UnHealthyContainerStates.values()))) .execute(); - dslContext.createIndex("idx_container_state") - .on(DSL.table(UNHEALTHY_CONTAINERS_TABLE_NAME), DSL.field(name(CONTAINER_STATE))) + // Composite index (container_state, container_id) serves two query patterns: + // + // 1. COUNT(*)/GROUP-BY filtered by state: + // WHERE container_state = ? + // Derby uses the index prefix (container_state) — same efficiency as the old + // single-column idx_container_state. + // + // 2. Paginated reads filtered by state + cursor: + // WHERE container_state = ? AND container_id > ? ORDER BY container_id ASC + // With the old single-column index Derby had to: + // a) Scan ALL rows for the state (e.g. 200K), then + // b) Sort them by container_id for every page call — O(n) per page. + // With this composite index Derby jumps directly to (state, minId) and reads + // the next LIMIT entries sequentially — O(1) per page, ~10–14× faster. + dslContext.createIndex("idx_state_container_id") + .on(DSL.table(UNHEALTHY_CONTAINERS_TABLE_NAME), + DSL.field(name(CONTAINER_STATE)), + DSL.field(name(CONTAINER_ID))) Review Comment: ``` if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) { createUnhealthyContainersTable(); // creates table + composite index } ``` The composite index idx_state_container_id is created inside createUnhealthyContainersTable(). This method is only called if the table doesn't exist. If someone upgrades an existing Recon deployment, the UNHEALTHY_CONTAINERS table already exists (from V1), so this entire method is skipped. The new composite index is never created on existing clusters they're stuck with the old single-column index and get none of the 43–67× performance improvement. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); Review Comment: This collects every single container ID in the cluster (healthy and unhealthy) and runs DELETE statements for all of them. On a cluster with 1 million containers, that means: - Allocating a list with 1M entries - Running 1,000 chunked DELETE statements - Most of those containers are healthy and have no rows in the table, so the DELETEs are wasted work Suggestion: Instead, just delete all rows by state: DELETE FROM UNHEALTHY_CONTAINERS WHERE container_state IN (...) — one statement, no chunking, much faster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
