sumitagrawl commented on code in PR #9258: URL: https://github.com/apache/ozone/pull/9258#discussion_r2887740100
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ContainerHealthTaskV2Metrics.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.metrics; + +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.ozone.OzoneConsts; + +/** + * Runtime metrics for ContainerHealthTaskV2 execution. + */ [email protected] +@Metrics(about = "ContainerHealthTaskV2 Metrics", context = OzoneConsts.OZONE) +public final class ContainerHealthTaskV2Metrics { Review Comment: we can remove V2 from all classname and variablename ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManagerReport.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport; + +/** + * Extended ReplicationManagerReport that captures ALL container health states, + * not just the first 100 samples per state. + * + * <p>SCM's standard ReplicationManagerReport uses sampling (SAMPLE_LIMIT = 100) + * to limit memory usage. This is appropriate for SCM which only needs samples + * for debugging/UI display.</p> + * + * <p>Recon, however, needs to track per-container health states for ALL containers + * to populate its UNHEALTHY_CONTAINERS table. This extended report removes + * the sampling limitation while maintaining backward compatibility by still + * calling the parent's incrementAndSample() method.</p> + * + * <p><b>REPLICA_MISMATCH Handling:</b> Since SCM's HealthState enum doesn't include + * REPLICA_MISMATCH (it's a Recon-specific check for data checksum mismatches), + * we track it separately in replicaMismatchContainers.</p> + * + * <p><b>Memory Impact:</b> For a cluster with 100K containers and 5% unhealthy rate, + * this adds approximately 620KB of memory during report generation (5K containers + * × 124 bytes per container). Even in worst case (100% unhealthy), memory usage + * is only ~14MB, which is negligible for Recon.</p> + */ +public class ReconReplicationManagerReport extends ReplicationManagerReport { + + // Captures ALL containers per health state (no SAMPLE_LIMIT restriction) + private final Map<ContainerHealthState, List<ContainerID>> allContainersByState = + new HashMap<>(); + + // Captures containers with REPLICA_MISMATCH (Recon-specific, not in SCM's HealthState) + private final List<ContainerID> replicaMismatchContainers = new ArrayList<>(); + + public ReconReplicationManagerReport() { + // Recon keeps a full per-state list in allContainersByState below. + // Disable base sampling map to avoid duplicate tracking. + super(0); + } + + /** + * Override to capture ALL containers, not just first 100 samples. + * Still calls parent method to maintain aggregate counts and samples + * for backward compatibility. + * + * @param stat The health state to increment + * @param container The container ID to record + */ + @Override + public void incrementAndSample(ContainerHealthState stat, ContainerInfo container) { Review Comment: we can remove other methods, just have 2 method for replica-mismatch state. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTaskV2.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import javax.inject.Inject; +import org.apache.hadoop.ozone.recon.metrics.ContainerHealthTaskV2Metrics; +import org.apache.hadoop.ozone.recon.scm.ReconScmTask; +import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; +import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig; +import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * V2 implementation of Container Health Task using Local ReplicationManager. + * + * <p><b>Solution:</b></p> + * <ul> + * <li>Uses Recon's local ReplicationManager (not RPC to SCM)</li> + * <li>Calls processAll() once to check all containers in batch</li> + * <li>ReplicationManager uses stub PendingOps (NoOpsContainerReplicaPendingOps)</li> + * <li>No false positives despite stub - health determination ignores pending ops</li> + * <li>All database operations handled inside ReconReplicationManager</li> + * </ul> + * + * <p><b>Benefits over RPC call to SCM 3:</b></p> + * <ul> + * <li>Zero RPC overhead (no per-container calls to SCM)</li> + * <li>Zero SCM load</li> + * <li>Simpler code - single method call</li> + * <li>Perfect accuracy (proven via code analysis)</li> + * <li>Captures ALL container health states (no 100-sample limit)</li> + * </ul> + * + * @see ReconReplicationManager + * @see NoOpsContainerReplicaPendingOps + */ +public class ContainerHealthTaskV2 extends ReconScmTask { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerHealthTaskV2.class); + + private final ReconStorageContainerManagerFacade reconScm; + private final long interval; + private final ContainerHealthTaskV2Metrics metrics; + + @Inject + public ContainerHealthTaskV2( + ReconTaskConfig reconTaskConfig, + ReconTaskStatusUpdaterManager taskStatusUpdaterManager, + ReconStorageContainerManagerFacade reconScm) { + super(taskStatusUpdaterManager); + this.reconScm = reconScm; + this.interval = reconTaskConfig.getMissingContainerTaskInterval().toMillis(); + this.metrics = ContainerHealthTaskV2Metrics.create(); + LOG.info("Initialized ContainerHealthTaskV2 with Local ReplicationManager, interval={}ms", + interval); + } + + @Override + protected void run() { + while (canRun()) { + long cycleStart = Time.monotonicNow(); + try { + initializeAndRunTask(); + long elapsed = Time.monotonicNow() - cycleStart; + long sleepMs = Math.max(0, interval - elapsed); Review Comment: to avoid continuous loop, we can have min-interval for next run to be 1 min ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); + Set<Long> chunkContainerIdSet = new HashSet<>(chunkContainerIds); + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState = + healthSchemaManager.getExistingInStateSinceByContainerIds(chunkContainerIds); + List<UnhealthyContainerRecordV2> recordsToInsert = new ArrayList<>(); + ProcessingStats chunkStats = new ProcessingStats(); + Set<Long> negativeSizeRecorded = new HashSet<>(); + + report.forEachContainerByState((state, cid) -> { + if (!chunkContainerIdSet.contains(cid.getId())) { + return; + } + try { + handleScmStateContainer(state, cid, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, + negativeSizeRecorded, chunkStats); + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found when processing {} state", cid, state, e); + } + }); + + int chunkReplicaMismatchCount = processReplicaMismatchContainersForChunk( + report, currentTime, existingInStateSinceByContainerAndState, + recordsToInsert, chunkContainerIdSet); + totalReplicaMismatchCount += chunkReplicaMismatchCount; + totalStats.add(chunkStats); + persistUnhealthyRecords(chunkContainerIds, recordsToInsert); Review Comment: we can pass existingInStateSinceByContainerAndState values to improve deletion performance in case very little entry or no entry present in db. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); Review Comment: we can use only chunkContainerIdSet, chunkContainerIds is not requried. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); + Set<Long> chunkContainerIdSet = new HashSet<>(chunkContainerIds); + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState = + healthSchemaManager.getExistingInStateSinceByContainerIds(chunkContainerIds); + List<UnhealthyContainerRecordV2> recordsToInsert = new ArrayList<>(); + ProcessingStats chunkStats = new ProcessingStats(); + Set<Long> negativeSizeRecorded = new HashSet<>(); + + report.forEachContainerByState((state, cid) -> { Review Comment: we can have SCM default report with sample-limit as 0, and no need additional capture of State vs containerId. For replica_mismatch, can have another list being passed, need not be part of report. and here, can have loop of allContainers from` 'from' to 'to' `with index access as its array list (no need loop report), and if check can be avoided `chunkContaineridSet` ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManagerV2.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.persistence; + +import static org.apache.ozone.recon.schema.ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME; +import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS; +import static org.jooq.impl.DSL.count; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.apache.ozone.recon.schema.generated.tables.daos.UnhealthyContainersDao; +import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers; +import org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord; +import org.jooq.Condition; +import org.jooq.DSLContext; +import org.jooq.OrderField; +import org.jooq.Record; +import org.jooq.SelectQuery; +import org.jooq.exception.DataAccessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager for UNHEALTHY_CONTAINERS table used by ContainerHealthTaskV2. + */ +@Singleton +public class ContainerHealthSchemaManagerV2 { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerHealthSchemaManagerV2.class); + private static final int BATCH_INSERT_CHUNK_SIZE = 1000; + + /** + * Maximum number of container IDs to include in a single + * {@code DELETE … WHERE container_id IN (…)} statement. + * + * <p>Derby's SQL compiler translates each prepared statement into a Java + * class. A large IN-predicate generates a deeply nested expression tree + * whose compiled bytecode can exceed the JVM hard limit of 65,535 bytes + * per method (ERROR XBCM4). Empirically, 5,000 IDs combined with the + * 7-state container_state IN-predicate generates ~148 KB — more than + * twice the limit. 1,000 IDs stays well under ~30 KB, providing a safe + * 2× margin.</p> + */ + static final int MAX_DELETE_CHUNK_SIZE = 1_000; + + private final UnhealthyContainersDao unhealthyContainersV2Dao; + private final ContainerSchemaDefinition containerSchemaDefinitionV2; + + @Inject + public ContainerHealthSchemaManagerV2( + ContainerSchemaDefinition containerSchemaDefinitionV2, + UnhealthyContainersDao unhealthyContainersV2Dao) { + this.unhealthyContainersV2Dao = unhealthyContainersV2Dao; + this.containerSchemaDefinitionV2 = containerSchemaDefinitionV2; + } + + /** + * Insert or update unhealthy container records in V2 table using TRUE batch insert. + * Uses JOOQ's batch API for optimal performance (single SQL statement for all records). + * Falls back to individual insert-or-update if batch insert fails (e.g., duplicate keys). + */ + public void insertUnhealthyContainerRecords(List<UnhealthyContainerRecordV2> recs) { + if (recs == null || recs.isEmpty()) { + return; + } + + if (LOG.isDebugEnabled()) { + recs.forEach(rec -> LOG.debug("rec.getContainerId() : {}, rec.getContainerState(): {}", + rec.getContainerId(), rec.getContainerState())); + } + + DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext(); + + try { + batchInsertInChunks(dslContext, recs); + + LOG.debug("Batch inserted {} unhealthy container records", recs.size()); + + } catch (DataAccessException e) { + // Batch insert failed (likely duplicate key) - fall back to insert-or-update per record + LOG.warn("Batch insert failed, falling back to individual insert-or-update for {} records", + recs.size(), e); + fallbackInsertOrUpdate(recs); + } catch (Exception e) { + LOG.error("Failed to batch insert records into {}", UNHEALTHY_CONTAINERS_TABLE_NAME, e); + throw new RuntimeException("Recon failed to insert " + recs.size() + + " unhealthy container records.", e); + } + } + + private void batchInsertInChunks(DSLContext dslContext, + List<UnhealthyContainerRecordV2> recs) { + dslContext.transaction(configuration -> { + DSLContext txContext = configuration.dsl(); + List<UnhealthyContainersRecord> records = + new ArrayList<>(BATCH_INSERT_CHUNK_SIZE); + + for (int from = 0; from < recs.size(); from += BATCH_INSERT_CHUNK_SIZE) { + int to = Math.min(from + BATCH_INSERT_CHUNK_SIZE, recs.size()); + records.clear(); + for (int i = from; i < to; i++) { + records.add(toJooqRecord(txContext, recs.get(i))); + } + txContext.batchInsert(records).execute(); + } + }); + } + + private void fallbackInsertOrUpdate(List<UnhealthyContainerRecordV2> recs) { + try (Connection connection = containerSchemaDefinitionV2.getDataSource().getConnection()) { + connection.setAutoCommit(false); + try { + for (UnhealthyContainerRecordV2 rec : recs) { + UnhealthyContainers jooqRec = toJooqPojo(rec); + try { + unhealthyContainersV2Dao.insert(jooqRec); + } catch (DataAccessException insertEx) { + // Duplicate key - update existing record + unhealthyContainersV2Dao.update(jooqRec); + } + } + connection.commit(); + } catch (Exception innerEx) { + connection.rollback(); + LOG.error("Transaction rolled back during fallback insert", innerEx); + throw innerEx; + } finally { + connection.setAutoCommit(true); + } + } catch (Exception fallbackEx) { + LOG.error("Failed to insert {} records even with fallback", recs.size(), fallbackEx); + throw new RuntimeException("Recon failed to insert " + recs.size() + + " unhealthy container records.", fallbackEx); + } + } + + private UnhealthyContainersRecord toJooqRecord(DSLContext txContext, + UnhealthyContainerRecordV2 rec) { + UnhealthyContainersRecord record = txContext.newRecord(UNHEALTHY_CONTAINERS); + record.setContainerId(rec.getContainerId()); + record.setContainerState(rec.getContainerState()); + record.setInStateSince(rec.getInStateSince()); + record.setExpectedReplicaCount(rec.getExpectedReplicaCount()); + record.setActualReplicaCount(rec.getActualReplicaCount()); + record.setReplicaDelta(rec.getReplicaDelta()); + record.setReason(rec.getReason()); + return record; + } + + private UnhealthyContainers toJooqPojo(UnhealthyContainerRecordV2 rec) { + return new UnhealthyContainers( + rec.getContainerId(), + rec.getContainerState(), + rec.getInStateSince(), + rec.getExpectedReplicaCount(), + rec.getActualReplicaCount(), + rec.getReplicaDelta(), + rec.getReason()); + } + + /** + * Batch delete all health states for multiple containers. + * This deletes all states generated from SCM/Recon health scans: + * MISSING, EMPTY_MISSING, UNDER_REPLICATED, OVER_REPLICATED, + * MIS_REPLICATED, NEGATIVE_SIZE and REPLICA_MISMATCH for all containers + * in the list. + * + * <p>REPLICA_MISMATCH is included here because it is re-evaluated on every + * scan cycle (just like the SCM-sourced states); omitting it would leave + * stale REPLICA_MISMATCH records in the table after a mismatch is resolved. + * + * <p><b>Derby bytecode limit:</b> Derby translates each SQL statement into + * a Java class whose methods must each stay under the JVM 64 KB bytecode + * limit. A single {@code IN} predicate with more than ~2,000 values (when + * combined with the 7-state container_state filter) overflows this limit + * and causes {@code ERROR XBCM4}. This method automatically partitions + * {@code containerIds} into chunks of at most {@value #MAX_DELETE_CHUNK_SIZE} + * IDs so callers never need to worry about the limit, regardless of how + * many containers a scan cycle processes. + * + * @param containerIds List of container IDs to delete states for + */ + public void batchDeleteSCMStatesForContainers(List<Long> containerIds) { + if (containerIds == null || containerIds.isEmpty()) { + return; + } + + DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext(); + int totalDeleted = 0; + + // Chunk the container IDs so each DELETE statement stays within Derby's + // generated-bytecode limit (MAX_DELETE_CHUNK_SIZE IDs per statement). + for (int from = 0; from < containerIds.size(); from += MAX_DELETE_CHUNK_SIZE) { + int to = Math.min(from + MAX_DELETE_CHUNK_SIZE, containerIds.size()); + List<Long> chunk = containerIds.subList(from, to); + + try { Review Comment: delete and insert for same container to be in same transaction ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManagerV2.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.persistence; + +import static org.apache.ozone.recon.schema.ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME; +import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS; +import static org.jooq.impl.DSL.count; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.apache.ozone.recon.schema.generated.tables.daos.UnhealthyContainersDao; +import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers; +import org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord; +import org.jooq.Condition; +import org.jooq.DSLContext; +import org.jooq.OrderField; +import org.jooq.Record; +import org.jooq.SelectQuery; +import org.jooq.exception.DataAccessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager for UNHEALTHY_CONTAINERS table used by ContainerHealthTaskV2. + */ +@Singleton +public class ContainerHealthSchemaManagerV2 { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerHealthSchemaManagerV2.class); + private static final int BATCH_INSERT_CHUNK_SIZE = 1000; + + /** + * Maximum number of container IDs to include in a single + * {@code DELETE … WHERE container_id IN (…)} statement. + * + * <p>Derby's SQL compiler translates each prepared statement into a Java + * class. A large IN-predicate generates a deeply nested expression tree + * whose compiled bytecode can exceed the JVM hard limit of 65,535 bytes + * per method (ERROR XBCM4). Empirically, 5,000 IDs combined with the + * 7-state container_state IN-predicate generates ~148 KB — more than + * twice the limit. 1,000 IDs stays well under ~30 KB, providing a safe + * 2× margin.</p> + */ + static final int MAX_DELETE_CHUNK_SIZE = 1_000; + + private final UnhealthyContainersDao unhealthyContainersV2Dao; + private final ContainerSchemaDefinition containerSchemaDefinitionV2; + + @Inject + public ContainerHealthSchemaManagerV2( + ContainerSchemaDefinition containerSchemaDefinitionV2, + UnhealthyContainersDao unhealthyContainersV2Dao) { + this.unhealthyContainersV2Dao = unhealthyContainersV2Dao; + this.containerSchemaDefinitionV2 = containerSchemaDefinitionV2; + } + + /** + * Insert or update unhealthy container records in V2 table using TRUE batch insert. + * Uses JOOQ's batch API for optimal performance (single SQL statement for all records). + * Falls back to individual insert-or-update if batch insert fails (e.g., duplicate keys). + */ + public void insertUnhealthyContainerRecords(List<UnhealthyContainerRecordV2> recs) { + if (recs == null || recs.isEmpty()) { + return; + } + + if (LOG.isDebugEnabled()) { + recs.forEach(rec -> LOG.debug("rec.getContainerId() : {}, rec.getContainerState(): {}", + rec.getContainerId(), rec.getContainerState())); + } + + DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext(); + + try { + batchInsertInChunks(dslContext, recs); + + LOG.debug("Batch inserted {} unhealthy container records", recs.size()); + + } catch (DataAccessException e) { + // Batch insert failed (likely duplicate key) - fall back to insert-or-update per record + LOG.warn("Batch insert failed, falling back to individual insert-or-update for {} records", + recs.size(), e); + fallbackInsertOrUpdate(recs); + } catch (Exception e) { + LOG.error("Failed to batch insert records into {}", UNHEALTHY_CONTAINERS_TABLE_NAME, e); + throw new RuntimeException("Recon failed to insert " + recs.size() + + " unhealthy container records.", e); + } + } + + private void batchInsertInChunks(DSLContext dslContext, + List<UnhealthyContainerRecordV2> recs) { + dslContext.transaction(configuration -> { + DSLContext txContext = configuration.dsl(); + List<UnhealthyContainersRecord> records = + new ArrayList<>(BATCH_INSERT_CHUNK_SIZE); + + for (int from = 0; from < recs.size(); from += BATCH_INSERT_CHUNK_SIZE) { + int to = Math.min(from + BATCH_INSERT_CHUNK_SIZE, recs.size()); + records.clear(); + for (int i = from; i < to; i++) { + records.add(toJooqRecord(txContext, recs.get(i))); + } + txContext.batchInsert(records).execute(); + } + }); + } + + private void fallbackInsertOrUpdate(List<UnhealthyContainerRecordV2> recs) { + try (Connection connection = containerSchemaDefinitionV2.getDataSource().getConnection()) { + connection.setAutoCommit(false); + try { + for (UnhealthyContainerRecordV2 rec : recs) { + UnhealthyContainers jooqRec = toJooqPojo(rec); + try { + unhealthyContainersV2Dao.insert(jooqRec); + } catch (DataAccessException insertEx) { + // Duplicate key - update existing record + unhealthyContainersV2Dao.update(jooqRec); + } + } + connection.commit(); + } catch (Exception innerEx) { + connection.rollback(); + LOG.error("Transaction rolled back during fallback insert", innerEx); + throw innerEx; + } finally { + connection.setAutoCommit(true); + } + } catch (Exception fallbackEx) { + LOG.error("Failed to insert {} records even with fallback", recs.size(), fallbackEx); + throw new RuntimeException("Recon failed to insert " + recs.size() + + " unhealthy container records.", fallbackEx); + } + } + + private UnhealthyContainersRecord toJooqRecord(DSLContext txContext, + UnhealthyContainerRecordV2 rec) { + UnhealthyContainersRecord record = txContext.newRecord(UNHEALTHY_CONTAINERS); + record.setContainerId(rec.getContainerId()); + record.setContainerState(rec.getContainerState()); + record.setInStateSince(rec.getInStateSince()); + record.setExpectedReplicaCount(rec.getExpectedReplicaCount()); + record.setActualReplicaCount(rec.getActualReplicaCount()); + record.setReplicaDelta(rec.getReplicaDelta()); + record.setReason(rec.getReason()); + return record; + } + + private UnhealthyContainers toJooqPojo(UnhealthyContainerRecordV2 rec) { + return new UnhealthyContainers( + rec.getContainerId(), + rec.getContainerState(), + rec.getInStateSince(), + rec.getExpectedReplicaCount(), + rec.getActualReplicaCount(), + rec.getReplicaDelta(), + rec.getReason()); + } + + /** + * Batch delete all health states for multiple containers. + * This deletes all states generated from SCM/Recon health scans: + * MISSING, EMPTY_MISSING, UNDER_REPLICATED, OVER_REPLICATED, + * MIS_REPLICATED, NEGATIVE_SIZE and REPLICA_MISMATCH for all containers + * in the list. + * + * <p>REPLICA_MISMATCH is included here because it is re-evaluated on every + * scan cycle (just like the SCM-sourced states); omitting it would leave + * stale REPLICA_MISMATCH records in the table after a mismatch is resolved. + * + * <p><b>Derby bytecode limit:</b> Derby translates each SQL statement into + * a Java class whose methods must each stay under the JVM 64 KB bytecode + * limit. A single {@code IN} predicate with more than ~2,000 values (when + * combined with the 7-state container_state filter) overflows this limit + * and causes {@code ERROR XBCM4}. This method automatically partitions + * {@code containerIds} into chunks of at most {@value #MAX_DELETE_CHUNK_SIZE} + * IDs so callers never need to worry about the limit, regardless of how + * many containers a scan cycle processes. + * + * @param containerIds List of container IDs to delete states for + */ + public void batchDeleteSCMStatesForContainers(List<Long> containerIds) { + if (containerIds == null || containerIds.isEmpty()) { + return; + } + + DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext(); + int totalDeleted = 0; + + // Chunk the container IDs so each DELETE statement stays within Derby's + // generated-bytecode limit (MAX_DELETE_CHUNK_SIZE IDs per statement). + for (int from = 0; from < containerIds.size(); from += MAX_DELETE_CHUNK_SIZE) { + int to = Math.min(from + MAX_DELETE_CHUNK_SIZE, containerIds.size()); + List<Long> chunk = containerIds.subList(from, to); + + try { + int deleted = dslContext.deleteFrom(UNHEALTHY_CONTAINERS) + .where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(chunk)) + .and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.in( + UnHealthyContainerStates.MISSING.toString(), + UnHealthyContainerStates.EMPTY_MISSING.toString(), + UnHealthyContainerStates.UNDER_REPLICATED.toString(), + UnHealthyContainerStates.OVER_REPLICATED.toString(), + UnHealthyContainerStates.MIS_REPLICATED.toString(), + UnHealthyContainerStates.NEGATIVE_SIZE.toString(), + UnHealthyContainerStates.REPLICA_MISMATCH.toString())) + .execute(); + totalDeleted += deleted; + } catch (Exception e) { + LOG.error("Failed to batch delete health states for {} containers (chunk {}-{})", + chunk.size(), from, to, e); + throw new RuntimeException("Failed to batch delete health states", e); + } + } + + LOG.debug("Batch deleted {} health state records for {} containers", + totalDeleted, containerIds.size()); + } + + /** + * Returns previous in-state-since timestamps for tracked unhealthy states. + * The key is a stable containerId + state tuple. + */ + public Map<ContainerStateKey, Long> getExistingInStateSinceByContainerIds( + List<Long> containerIds) { + if (containerIds == null || containerIds.isEmpty()) { + return new HashMap<>(); + } + + DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext(); + Map<ContainerStateKey, Long> existing = new HashMap<>(); + try { + dslContext.select( + UNHEALTHY_CONTAINERS.CONTAINER_ID, + UNHEALTHY_CONTAINERS.CONTAINER_STATE, + UNHEALTHY_CONTAINERS.IN_STATE_SINCE) + .from(UNHEALTHY_CONTAINERS) + .where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(containerIds)) + .and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.in( + UnHealthyContainerStates.MISSING.toString(), + UnHealthyContainerStates.EMPTY_MISSING.toString(), + UnHealthyContainerStates.UNDER_REPLICATED.toString(), + UnHealthyContainerStates.OVER_REPLICATED.toString(), + UnHealthyContainerStates.MIS_REPLICATED.toString(), + UnHealthyContainerStates.NEGATIVE_SIZE.toString(), + UnHealthyContainerStates.REPLICA_MISMATCH.toString())) + .forEach(record -> existing.put( + new ContainerStateKey(record.get(UNHEALTHY_CONTAINERS.CONTAINER_ID), + record.get(UNHEALTHY_CONTAINERS.CONTAINER_STATE)), + record.get(UNHEALTHY_CONTAINERS.IN_STATE_SINCE))); + } catch (Exception e) { + LOG.warn("Failed to load existing inStateSince records. Falling back to current scan time.", e); + } + return existing; + } + + /** + * Get summary of unhealthy containers grouped by state from V2 table. + */ + public List<UnhealthyContainersSummaryV2> getUnhealthyContainersSummary() { + DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext(); + List<UnhealthyContainersSummaryV2> result = new ArrayList<>(); + + try { + return dslContext + .select(UNHEALTHY_CONTAINERS.CONTAINER_STATE.as("containerState"), + count().as("cnt")) + .from(UNHEALTHY_CONTAINERS) + .groupBy(UNHEALTHY_CONTAINERS.CONTAINER_STATE) + .fetchInto(UnhealthyContainersSummaryV2.class); + } catch (Exception e) { + LOG.error("Failed to get summary from V2 table", e); + return result; + } + } + + /** + * Get unhealthy containers from V2 table. + */ + public List<UnhealthyContainerRecordV2> getUnhealthyContainers( + UnHealthyContainerStates state, long minContainerId, long maxContainerId, int limit) { + DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext(); + + SelectQuery<Record> query = dslContext.selectQuery(); + query.addFrom(UNHEALTHY_CONTAINERS); + + Condition containerCondition; + OrderField[] orderField; + + if (maxContainerId > 0) { + containerCondition = UNHEALTHY_CONTAINERS.CONTAINER_ID.lessThan(maxContainerId); + orderField = new OrderField[]{ + UNHEALTHY_CONTAINERS.CONTAINER_ID.desc(), + UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc() + }; + } else { + containerCondition = UNHEALTHY_CONTAINERS.CONTAINER_ID.greaterThan(minContainerId); + orderField = new OrderField[]{ + UNHEALTHY_CONTAINERS.CONTAINER_ID.asc(), + UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc() + }; + } + + if (state != null) { + query.addConditions(containerCondition.and( + UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()))); + } else { + query.addConditions(containerCondition); + } + + query.addOrderBy(orderField); + query.addLimit(limit); + + // Pre-buffer `limit` rows per JDBC round-trip instead of Derby's default of 1 row. + query.fetchSize(limit); + + try { + Stream<UnhealthyContainersRecord> stream = + query.fetchInto(UnhealthyContainersRecord.class).stream(); + + if (maxContainerId > 0) { + // Reverse-pagination path: SQL orders DESC (to get the last `limit` rows before + // maxContainerId); re-sort to ASC so callers always see ascending container IDs. + stream = stream.sorted(Comparator.comparingLong(UnhealthyContainersRecord::getContainerId)); + } + // Forward-pagination path: SQL already orders ASC — no Java re-sort needed. + + return stream.map(record -> new UnhealthyContainerRecordV2( + record.getContainerId(), + record.getContainerState(), + record.getInStateSince(), + record.getExpectedReplicaCount(), + record.getActualReplicaCount(), + record.getReplicaDelta(), + record.getReason())) + .collect(Collectors.toList()); + } catch (Exception e) { + LOG.error("Failed to query V2 table", e); + return new ArrayList<>(); + } + } + + /** + * Clear all records from V2 table (for testing). + */ + @VisibleForTesting + public void clearAllUnhealthyContainerRecords() { + DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext(); + try { + dslContext.deleteFrom(UNHEALTHY_CONTAINERS).execute(); + LOG.info("Cleared all V2 unhealthy container records"); + } catch (Exception e) { + LOG.error("Failed to clear V2 unhealthy container records", e); + } + } + + /** + * POJO representing a record in UNHEALTHY_CONTAINERS table. + */ + public static class UnhealthyContainerRecordV2 { + private final long containerId; + private final String containerState; + private final long inStateSince; + private final int expectedReplicaCount; + private final int actualReplicaCount; + private final int replicaDelta; + private final String reason; + + public UnhealthyContainerRecordV2(long containerId, String containerState, + long inStateSince, int expectedReplicaCount, int actualReplicaCount, + int replicaDelta, String reason) { + this.containerId = containerId; + this.containerState = containerState; + this.inStateSince = inStateSince; + this.expectedReplicaCount = expectedReplicaCount; + this.actualReplicaCount = actualReplicaCount; + this.replicaDelta = replicaDelta; + this.reason = reason; + } + + public long getContainerId() { + return containerId; + } + + public String getContainerState() { + return containerState; + } + + public long getInStateSince() { + return inStateSince; + } + + public int getExpectedReplicaCount() { + return expectedReplicaCount; + } + + public int getActualReplicaCount() { + return actualReplicaCount; + } + + public int getReplicaDelta() { + return replicaDelta; + } + + public String getReason() { + return reason; + } + } + + /** + * Key type for (containerId, state). + */ + public static final class ContainerStateKey { + private final long containerId; + private final String state; + + public ContainerStateKey(long containerId, String state) { + this.containerId = containerId; + this.state = state; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof ContainerStateKey)) { + return false; + } + ContainerStateKey that = (ContainerStateKey) other; + return containerId == that.containerId && state.equals(that.state); + } + + @Override + public int hashCode() { + return Long.hashCode(containerId) * 31 + state.hashCode(); Review Comment: can use return Objects.hash(field1, field2); for combining 2 field, then own implementation ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.fsck; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.ContainerStateKey; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2; +import org.apache.hadoop.util.Time; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recon-specific extension of SCM's ReplicationManager. + * + * <p><b>Key Differences from SCM:</b></p> + * <ol> + * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations tracking)</li> + * <li>Overrides processAll() to capture ALL container health states (no 100-sample limit)</li> + * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li> + * <li>Does not issue replication commands (read-only monitoring)</li> + * </ol> + * + * <p><b>Why This Works Without PendingOps:</b></p> + * <p>SCM's health check logic uses a two-phase approach: + * <ul> + * <li><b>Phase 1 (Health Determination):</b> Calls isSufficientlyReplicated(false) + * which ignores pending operations. This phase determines the health state.</li> + * <li><b>Phase 2 (Command Deduplication):</b> Calls isSufficientlyReplicated(true) + * which considers pending operations. This phase decides whether to enqueue + * new commands.</li> + * </ul> + * Since Recon only needs Phase 1 (health determination) and doesn't issue commands, + * the stub PendingOps does not cause false positives.</p> + * + * @see NoOpsContainerReplicaPendingOps + * @see ReconReplicationManagerReport + */ +public class ReconReplicationManager extends ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconReplicationManager.class); + private static final int PERSIST_CHUNK_SIZE = 50_000; + + private final ContainerHealthSchemaManagerV2 healthSchemaManager; + private final ContainerManager containerManager; + + /** + * Immutable wiring context for ReconReplicationManager initialization. + */ + public static final class InitContext { + private final ReplicationManagerConfiguration rmConf; + private final ConfigurationSource conf; + private final ContainerManager containerManager; + private final PlacementPolicy ratisContainerPlacement; + private final PlacementPolicy ecContainerPlacement; + private final EventPublisher eventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private final Clock clock; + + private InitContext(Builder builder) { + this.rmConf = builder.rmConf; + this.conf = builder.conf; + this.containerManager = builder.containerManager; + this.ratisContainerPlacement = builder.ratisContainerPlacement; + this.ecContainerPlacement = builder.ecContainerPlacement; + this.eventPublisher = builder.eventPublisher; + this.scmContext = builder.scmContext; + this.nodeManager = builder.nodeManager; + this.clock = builder.clock; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating {@link InitContext} instances. + */ + public static final class Builder { + private ReplicationManagerConfiguration rmConf; + private ConfigurationSource conf; + private ContainerManager containerManager; + private PlacementPolicy ratisContainerPlacement; + private PlacementPolicy ecContainerPlacement; + private EventPublisher eventPublisher; + private SCMContext scmContext; + private NodeManager nodeManager; + private Clock clock; + + private Builder() { + } + + public Builder setRmConf(ReplicationManagerConfiguration rmConf) { + this.rmConf = rmConf; + return this; + } + + public Builder setConf(ConfigurationSource conf) { + this.conf = conf; + return this; + } + + public Builder setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + return this; + } + + public Builder setRatisContainerPlacement(PlacementPolicy ratisContainerPlacement) { + this.ratisContainerPlacement = ratisContainerPlacement; + return this; + } + + public Builder setEcContainerPlacement(PlacementPolicy ecContainerPlacement) { + this.ecContainerPlacement = ecContainerPlacement; + return this; + } + + public Builder setEventPublisher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } + + public Builder setScmContext(SCMContext scmContext) { + this.scmContext = scmContext; + return this; + } + + public Builder setNodeManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + return this; + } + + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } + + public InitContext build() { + return new InitContext(this); + } + } + } + + public ReconReplicationManager( + InitContext initContext, + ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException { + + // Call parent with stub PendingOps (proven to not cause false positives) + super( + initContext.rmConf, + initContext.conf, + initContext.containerManager, + initContext.ratisContainerPlacement, + initContext.ecContainerPlacement, + initContext.eventPublisher, + initContext.scmContext, + initContext.nodeManager, + initContext.clock, + new NoOpsContainerReplicaPendingOps(initContext.clock, initContext.rmConf) + ); + + this.containerManager = initContext.containerManager; + this.healthSchemaManager = healthSchemaManager; + } + + /** + * Override start() to prevent background threads from running. + * + * <p>In Recon, we don't want the ReplicationManager's background threads + * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor) + * to run continuously. Instead, we call processAll() manually from + * ContainerHealthTaskV2 on a schedule.</p> + * + * <p>This prevents: + * <ul> + * <li>Unnecessary CPU usage from continuous monitoring</li> + * <li>Initialization race conditions (start() being called before fields are initialized)</li> + * <li>Replication commands being generated (Recon is read-only)</li> + * </ul> + * </p> + */ + @Override + public synchronized void start() { + LOG.info("ReconReplicationManager.start() called - no-op (manual invocation via processAll())"); + // Do nothing - we call processAll() manually from ContainerHealthTaskV2 + } + + /** + * Checks if container replicas have mismatched data checksums. + * This is a Recon-specific check not done by SCM's ReplicationManager. + * + * <p>REPLICA_MISMATCH detection is crucial for identifying: + * <ul> + * <li>Bit rot (silent data corruption)</li> + * <li>Failed writes to some replicas</li> + * <li>Storage corruption on specific datanodes</li> + * <li>Network corruption during replication</li> + * </ul> + * </p> + * + * <p>This uses checksum mismatch logic: + * {@code replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() != 1} + * </p> + * + * @param replicas Set of container replicas to check + * @return true if replicas have different data checksums + */ + private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) { + if (replicas == null || replicas.isEmpty()) { + return false; + } + + // Count distinct checksums (filter out nulls) + long distinctChecksums = replicas.stream() + .map(ContainerReplica::getDataChecksum) + .filter(Objects::nonNull) + .distinct() + .count(); + + // More than 1 distinct checksum = data mismatch + // 0 distinct checksums = all nulls, no mismatch + return distinctChecksums > 1; + } + + /** + * Override processAll() to capture ALL per-container health states, + * not just aggregate counts and 100 samples. + * + * <p><b>Processing Flow:</b></p> + * <ol> + * <li>Get all containers from ContainerManager</li> + * <li>Process each container using inherited health check chain (SCM logic)</li> + * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li> + * <li>Capture ALL unhealthy container IDs per health state (no sampling limit)</li> + * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li> + * </ol> + * + * <p><b>Differences from SCM's processAll():</b></p> + * <ul> + * <li>Uses ReconReplicationManagerReport (captures all containers)</li> + * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li> + * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li> + * <li>Stores results in database instead of just keeping in-memory report</li> + * </ul> + */ + @Override + public synchronized void processAll() { + LOG.info("ReconReplicationManager starting container health check"); + + final long startTime = Time.monotonicNow(); + + // Use extended report that captures ALL containers, not just 100 samples + final ReconReplicationManagerReport report = new ReconReplicationManagerReport(); + final ReplicationQueue nullQueue = new MonitoringReplicationQueue(); + + // Get all containers (same as parent) + final List<ContainerInfo> containers = containerManager.getContainers(); + + LOG.info("Processing {} containers", containers.size()); + + // Process each container (reuses inherited processContainer and health check chain) + int processedCount = 0; + for (ContainerInfo container : containers) { + report.increment(container.getState()); + try { + ContainerID cid = container.containerID(); + + // Call inherited processContainer - this runs SCM's health check chain + // readOnly=true ensures no commands are generated + processContainer(container, nullQueue, report, true); + + // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in SCM) + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + if (hasDataChecksumMismatch(replicas)) { + report.addReplicaMismatchContainer(cid); + LOG.debug("Container {} has data checksum mismatch across replicas", cid); + } + + processedCount++; + + if (processedCount % 10000 == 0) { + LOG.info("Processed {}/{} containers", processedCount, containers.size()); + } + } catch (ContainerNotFoundException e) { + LOG.error("Container {} not found", container.getContainerID(), e); + } + } + + report.setComplete(); + + // Store ALL per-container health states to database + storeHealthStatesToDatabase(report, containers); + + long duration = Time.monotonicNow() - startTime; + LOG.info("ReconReplicationManager completed in {}ms for {} containers", + duration, containers.size()); + } + + /** + * Convert ReconReplicationManagerReport to database records and store. + * This captures all unhealthy containers with detailed replica counts. + * + * @param report The report with all captured container health states + * @param allContainers List of all containers for cleanup + */ + private void storeHealthStatesToDatabase( + ReconReplicationManagerReport report, + List<ContainerInfo> allContainers) { + long currentTime = System.currentTimeMillis(); + ProcessingStats totalStats = new ProcessingStats(); + int totalReplicaMismatchCount = 0; + + for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { + int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); + List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); + Set<Long> chunkContainerIdSet = new HashSet<>(chunkContainerIds); + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState = + healthSchemaManager.getExistingInStateSinceByContainerIds(chunkContainerIds); + List<UnhealthyContainerRecordV2> recordsToInsert = new ArrayList<>(); + ProcessingStats chunkStats = new ProcessingStats(); + Set<Long> negativeSizeRecorded = new HashSet<>(); + + report.forEachContainerByState((state, cid) -> { + if (!chunkContainerIdSet.contains(cid.getId())) { + return; + } + try { + handleScmStateContainer(state, cid, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, + negativeSizeRecorded, chunkStats); + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found when processing {} state", cid, state, e); + } + }); + + int chunkReplicaMismatchCount = processReplicaMismatchContainersForChunk( + report, currentTime, existingInStateSinceByContainerAndState, + recordsToInsert, chunkContainerIdSet); + totalReplicaMismatchCount += chunkReplicaMismatchCount; + totalStats.add(chunkStats); + persistUnhealthyRecords(chunkContainerIds, recordsToInsert); + } + + LOG.info("Stored {} MISSING, {} EMPTY_MISSING, {} UNDER_REPLICATED, " + + "{} OVER_REPLICATED, {} MIS_REPLICATED, {} NEGATIVE_SIZE, " + + "{} REPLICA_MISMATCH", + totalStats.missingCount, totalStats.emptyMissingCount, totalStats.underRepCount, + totalStats.overRepCount, totalStats.misRepCount, totalStats.negativeSizeCount, + totalReplicaMismatchCount); + } + + private void handleScmStateContainer( + ContainerHealthState state, + ContainerID containerId, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> negativeSizeRecorded, + ProcessingStats stats) throws ContainerNotFoundException { + switch (state) { + case MISSING: + handleMissingContainer(containerId, currentTime, + existingInStateSinceByContainerAndState, recordsToInsert, stats); + break; + case UNDER_REPLICATED: + stats.incrementUnderRepCount(); + handleReplicaStateContainer(containerId, currentTime, + UnHealthyContainerStates.UNDER_REPLICATED, + existingInStateSinceByContainerAndState, recordsToInsert, negativeSizeRecorded, stats); + break; + case OVER_REPLICATED: + stats.incrementOverRepCount(); + handleReplicaStateContainer(containerId, currentTime, + UnHealthyContainerStates.OVER_REPLICATED, + existingInStateSinceByContainerAndState, recordsToInsert, negativeSizeRecorded, stats); + break; + case MIS_REPLICATED: + stats.incrementMisRepCount(); + handleReplicaStateContainer(containerId, currentTime, + UnHealthyContainerStates.MIS_REPLICATED, + existingInStateSinceByContainerAndState, recordsToInsert, negativeSizeRecorded, stats); + break; + default: + break; + } + } + + private void handleMissingContainer( + ContainerID containerId, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + ProcessingStats stats) throws ContainerNotFoundException { + ContainerInfo container = containerManager.getContainer(containerId); + int expected = container.getReplicationConfig().getRequiredNodes(); + if (isEmptyMissing(container)) { + stats.incrementEmptyMissingCount(); + recordsToInsert.add(createRecord(container, + UnHealthyContainerStates.EMPTY_MISSING, + resolveInStateSince(container.getContainerID(), + UnHealthyContainerStates.EMPTY_MISSING, currentTime, + existingInStateSinceByContainerAndState), + expected, 0, + "Container has no replicas and no keys")); + return; + } + + stats.incrementMissingCount(); + recordsToInsert.add(createRecord(container, + UnHealthyContainerStates.MISSING, + resolveInStateSince(container.getContainerID(), + UnHealthyContainerStates.MISSING, currentTime, + existingInStateSinceByContainerAndState), + expected, 0, + "No replicas available")); + } + + private void handleReplicaStateContainer( + ContainerID containerId, + long currentTime, + UnHealthyContainerStates targetState, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> negativeSizeRecorded, + ProcessingStats stats) throws ContainerNotFoundException { + ContainerInfo container = containerManager.getContainer(containerId); + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(containerId); + int expected = container.getReplicationConfig().getRequiredNodes(); + int actual = replicas.size(); + recordsToInsert.add(createRecord(container, targetState, + resolveInStateSince(container.getContainerID(), targetState, + currentTime, existingInStateSinceByContainerAndState), + expected, actual, reasonForState(targetState))); + addNegativeSizeRecordIfNeeded(container, currentTime, actual, recordsToInsert, + existingInStateSinceByContainerAndState, negativeSizeRecorded, stats); + } + + private int processReplicaMismatchContainersForChunk( + ReconReplicationManagerReport report, + long currentTime, + Map<ContainerStateKey, Long> existingInStateSinceByContainerAndState, + List<UnhealthyContainerRecordV2> recordsToInsert, + Set<Long> chunkContainerIdSet) { + List<ContainerID> replicaMismatchContainers = report.getReplicaMismatchContainers(); + int chunkReplicaMismatchCount = 0; + for (ContainerID cid : replicaMismatchContainers) { + if (!chunkContainerIdSet.contains(cid.getId())) { + continue; + } + try { + ContainerInfo container = containerManager.getContainer(cid); + Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); + int expected = container.getReplicationConfig().getRequiredNodes(); + int actual = replicas.size(); + recordsToInsert.add(createRecord(container, + UnHealthyContainerStates.REPLICA_MISMATCH, + resolveInStateSince(container.getContainerID(), + UnHealthyContainerStates.REPLICA_MISMATCH, currentTime, + existingInStateSinceByContainerAndState), + expected, actual, + "Data checksum mismatch across replicas")); + chunkReplicaMismatchCount++; + } catch (ContainerNotFoundException e) { + LOG.warn("Container {} not found when processing REPLICA_MISMATCH state", cid, e); + } + } + return chunkReplicaMismatchCount; + } + + private List<Long> collectContainerIds(List<ContainerInfo> allContainers, + int fromInclusive, int toExclusive) { + List<Long> containerIds = new ArrayList<>(toExclusive - fromInclusive); + for (int i = fromInclusive; i < toExclusive; i++) { + containerIds.add(allContainers.get(i).getContainerID()); + } + return containerIds; + } + + private void persistUnhealthyRecords( + List<Long> containerIdsToDelete, Review Comment: perform delete and insert as atomic transaction -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
