dombizita commented on code in PR #9258:
URL: https://github.com/apache/ozone/pull/9258#discussion_r2883661941
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainerMetadata.java:
##########
@@ -61,15 +60,18 @@ public class UnhealthyContainerMetadata {
@XmlElement(name = "replicas")
private List<ContainerHistory> replicas;
- public UnhealthyContainerMetadata(UnhealthyContainers rec,
- List<ContainerHistory> replicas, UUID pipelineID, long keyCount) {
- this.containerID = rec.getContainerId();
- this.containerState = rec.getContainerState();
- this.unhealthySince = rec.getInStateSince();
- this.actualReplicaCount = rec.getActualReplicaCount();
- this.expectedReplicaCount = rec.getExpectedReplicaCount();
- this.replicaDeltaCount = rec.getReplicaDelta();
- this.reason = rec.getReason();
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ public UnhealthyContainerMetadata(long containerID, String containerState,
+ long unhealthySince, long expectedReplicaCount, long actualReplicaCount,
+ long replicaDeltaCount, String reason, List<ContainerHistory> replicas,
+ UUID pipelineID, long keyCount) {
Review Comment:
Why did you change from `UnhealthyContainers` to this parameter list?
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManagerV2.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.persistence;
+
+import static
org.apache.ozone.recon.schema.ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME;
+import static
org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS;
+import static org.jooq.impl.DSL.count;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
+import
org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
+import
org.apache.ozone.recon.schema.generated.tables.daos.UnhealthyContainersDao;
+import
org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers;
+import
org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
+import org.jooq.Condition;
+import org.jooq.DSLContext;
+import org.jooq.OrderField;
+import org.jooq.Record;
+import org.jooq.SelectQuery;
+import org.jooq.exception.DataAccessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for UNHEALTHY_CONTAINERS table used by ContainerHealthTaskV2.
+ */
+@Singleton
+public class ContainerHealthSchemaManagerV2 {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerHealthSchemaManagerV2.class);
+ private static final int BATCH_INSERT_CHUNK_SIZE = 1000;
+
+ /**
+ * Maximum number of container IDs to include in a single
+ * {@code DELETE … WHERE container_id IN (…)} statement.
+ *
+ * <p>Derby's SQL compiler translates each prepared statement into a Java
+ * class. A large IN-predicate generates a deeply nested expression tree
+ * whose compiled bytecode can exceed the JVM hard limit of 65,535 bytes
+ * per method (ERROR XBCM4). Empirically, 5,000 IDs combined with the
+ * 7-state container_state IN-predicate generates ~148 KB — more than
+ * twice the limit. 1,000 IDs stays well under ~30 KB, providing a safe
+ * 2× margin.</p>
+ */
+ static final int MAX_DELETE_CHUNK_SIZE = 1_000;
+
+ private final UnhealthyContainersDao unhealthyContainersV2Dao;
+ private final ContainerSchemaDefinition containerSchemaDefinitionV2;
+
+ @Inject
+ public ContainerHealthSchemaManagerV2(
+ ContainerSchemaDefinition containerSchemaDefinitionV2,
+ UnhealthyContainersDao unhealthyContainersV2Dao) {
+ this.unhealthyContainersV2Dao = unhealthyContainersV2Dao;
+ this.containerSchemaDefinitionV2 = containerSchemaDefinitionV2;
+ }
+
+ /**
+ * Insert or update unhealthy container records in V2 table using TRUE batch
insert.
+ * Uses JOOQ's batch API for optimal performance (single SQL statement for
all records).
+ * Falls back to individual insert-or-update if batch insert fails (e.g.,
duplicate keys).
+ */
+ public void insertUnhealthyContainerRecords(List<UnhealthyContainerRecordV2>
recs) {
+ if (recs == null || recs.isEmpty()) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ recs.forEach(rec -> LOG.debug("rec.getContainerId() : {},
rec.getContainerState(): {}",
+ rec.getContainerId(), rec.getContainerState()));
+ }
+
+ DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext();
+
+ try {
+ batchInsertInChunks(dslContext, recs);
+
+ LOG.debug("Batch inserted {} unhealthy container records", recs.size());
+
+ } catch (DataAccessException e) {
+ // Batch insert failed (likely duplicate key) - fall back to
insert-or-update per record
+ LOG.warn("Batch insert failed, falling back to individual
insert-or-update for {} records",
+ recs.size(), e);
+ fallbackInsertOrUpdate(recs);
+ } catch (Exception e) {
+ LOG.error("Failed to batch insert records into {}",
UNHEALTHY_CONTAINERS_TABLE_NAME, e);
+ throw new RuntimeException("Recon failed to insert " + recs.size() +
+ " unhealthy container records.", e);
+ }
+ }
+
+ private void batchInsertInChunks(DSLContext dslContext,
+ List<UnhealthyContainerRecordV2> recs) {
+ dslContext.transaction(configuration -> {
+ DSLContext txContext = configuration.dsl();
+ List<UnhealthyContainersRecord> records =
+ new ArrayList<>(BATCH_INSERT_CHUNK_SIZE);
+
+ for (int from = 0; from < recs.size(); from += BATCH_INSERT_CHUNK_SIZE) {
+ int to = Math.min(from + BATCH_INSERT_CHUNK_SIZE, recs.size());
+ records.clear();
+ for (int i = from; i < to; i++) {
+ records.add(toJooqRecord(txContext, recs.get(i)));
+ }
+ txContext.batchInsert(records).execute();
+ }
+ });
+ }
+
+ private void fallbackInsertOrUpdate(List<UnhealthyContainerRecordV2> recs) {
+ try (Connection connection =
containerSchemaDefinitionV2.getDataSource().getConnection()) {
+ connection.setAutoCommit(false);
+ try {
+ for (UnhealthyContainerRecordV2 rec : recs) {
+ UnhealthyContainers jooqRec = toJooqPojo(rec);
+ try {
+ unhealthyContainersV2Dao.insert(jooqRec);
+ } catch (DataAccessException insertEx) {
+ // Duplicate key - update existing record
+ unhealthyContainersV2Dao.update(jooqRec);
+ }
+ }
+ connection.commit();
+ } catch (Exception innerEx) {
+ connection.rollback();
+ LOG.error("Transaction rolled back during fallback insert", innerEx);
+ throw innerEx;
+ } finally {
+ connection.setAutoCommit(true);
+ }
+ } catch (Exception fallbackEx) {
+ LOG.error("Failed to insert {} records even with fallback", recs.size(),
fallbackEx);
+ throw new RuntimeException("Recon failed to insert " + recs.size() +
+ " unhealthy container records.", fallbackEx);
+ }
+ }
+
+ private UnhealthyContainersRecord toJooqRecord(DSLContext txContext,
+ UnhealthyContainerRecordV2 rec) {
+ UnhealthyContainersRecord record =
txContext.newRecord(UNHEALTHY_CONTAINERS);
+ record.setContainerId(rec.getContainerId());
+ record.setContainerState(rec.getContainerState());
+ record.setInStateSince(rec.getInStateSince());
+ record.setExpectedReplicaCount(rec.getExpectedReplicaCount());
+ record.setActualReplicaCount(rec.getActualReplicaCount());
+ record.setReplicaDelta(rec.getReplicaDelta());
+ record.setReason(rec.getReason());
+ return record;
+ }
+
+ private UnhealthyContainers toJooqPojo(UnhealthyContainerRecordV2 rec) {
+ return new UnhealthyContainers(
+ rec.getContainerId(),
+ rec.getContainerState(),
+ rec.getInStateSince(),
+ rec.getExpectedReplicaCount(),
+ rec.getActualReplicaCount(),
+ rec.getReplicaDelta(),
+ rec.getReason());
+ }
+
+ /**
+ * Batch delete all health states for multiple containers.
+ * This deletes all states generated from SCM/Recon health scans:
+ * MISSING, EMPTY_MISSING, UNDER_REPLICATED, OVER_REPLICATED,
+ * MIS_REPLICATED, NEGATIVE_SIZE and REPLICA_MISMATCH for all containers
+ * in the list.
+ *
+ * <p>REPLICA_MISMATCH is included here because it is re-evaluated on every
+ * scan cycle (just like the SCM-sourced states); omitting it would leave
+ * stale REPLICA_MISMATCH records in the table after a mismatch is resolved.
+ *
+ * <p><b>Derby bytecode limit:</b> Derby translates each SQL statement into
+ * a Java class whose methods must each stay under the JVM 64 KB bytecode
+ * limit. A single {@code IN} predicate with more than ~2,000 values (when
+ * combined with the 7-state container_state filter) overflows this limit
+ * and causes {@code ERROR XBCM4}. This method automatically partitions
+ * {@code containerIds} into chunks of at most {@value
#MAX_DELETE_CHUNK_SIZE}
+ * IDs so callers never need to worry about the limit, regardless of how
+ * many containers a scan cycle processes.
+ *
+ * @param containerIds List of container IDs to delete states for
+ */
+ public void batchDeleteSCMStatesForContainers(List<Long> containerIds) {
+ if (containerIds == null || containerIds.isEmpty()) {
+ return;
+ }
+
+ DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext();
+ int totalDeleted = 0;
+
+ // Chunk the container IDs so each DELETE statement stays within Derby's
+ // generated-bytecode limit (MAX_DELETE_CHUNK_SIZE IDs per statement).
+ for (int from = 0; from < containerIds.size(); from +=
MAX_DELETE_CHUNK_SIZE) {
+ int to = Math.min(from + MAX_DELETE_CHUNK_SIZE, containerIds.size());
+ List<Long> chunk = containerIds.subList(from, to);
+
+ try {
+ int deleted = dslContext.deleteFrom(UNHEALTHY_CONTAINERS)
+ .where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(chunk))
+ .and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.in(
+ UnHealthyContainerStates.MISSING.toString(),
+ UnHealthyContainerStates.EMPTY_MISSING.toString(),
+ UnHealthyContainerStates.UNDER_REPLICATED.toString(),
+ UnHealthyContainerStates.OVER_REPLICATED.toString(),
+ UnHealthyContainerStates.MIS_REPLICATED.toString(),
+ UnHealthyContainerStates.NEGATIVE_SIZE.toString(),
+ UnHealthyContainerStates.REPLICA_MISMATCH.toString()))
+ .execute();
+ totalDeleted += deleted;
+ } catch (Exception e) {
+ LOG.error("Failed to batch delete health states for {} containers
(chunk {}-{})",
+ chunk.size(), from, to, e);
+ throw new RuntimeException("Failed to batch delete health states", e);
+ }
+ }
+
+ LOG.debug("Batch deleted {} health state records for {} containers",
+ totalDeleted, containerIds.size());
+ }
+
+ /**
+ * Get summary of unhealthy containers grouped by state from V2 table.
+ */
+ public List<UnhealthyContainersSummaryV2> getUnhealthyContainersSummary() {
+ DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext();
+ List<UnhealthyContainersSummaryV2> result = new ArrayList<>();
+
+ try {
+ return dslContext
+ .select(UNHEALTHY_CONTAINERS.CONTAINER_STATE.as("containerState"),
+ count().as("cnt"))
+ .from(UNHEALTHY_CONTAINERS)
+ .groupBy(UNHEALTHY_CONTAINERS.CONTAINER_STATE)
+ .fetchInto(UnhealthyContainersSummaryV2.class);
+ } catch (Exception e) {
+ LOG.error("Failed to get summary from V2 table", e);
+ return result;
+ }
+ }
+
+ /**
+ * Get unhealthy containers from V2 table.
+ */
+ public List<UnhealthyContainerRecordV2> getUnhealthyContainers(
+ UnHealthyContainerStates state, long minContainerId, long
maxContainerId, int limit) {
+ DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext();
+
+ SelectQuery<Record> query = dslContext.selectQuery();
+ query.addFrom(UNHEALTHY_CONTAINERS);
+
+ Condition containerCondition;
+ OrderField[] orderField;
+
+ if (maxContainerId > 0) {
+ containerCondition =
UNHEALTHY_CONTAINERS.CONTAINER_ID.lessThan(maxContainerId);
+ orderField = new OrderField[]{
+ UNHEALTHY_CONTAINERS.CONTAINER_ID.desc(),
+ UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc()
+ };
+ } else {
+ containerCondition =
UNHEALTHY_CONTAINERS.CONTAINER_ID.greaterThan(minContainerId);
+ orderField = new OrderField[]{
+ UNHEALTHY_CONTAINERS.CONTAINER_ID.asc(),
+ UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc()
+ };
+ }
+
+ if (state != null) {
+ query.addConditions(containerCondition.and(
+ UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString())));
+ } else {
+ query.addConditions(containerCondition);
+ }
+
+ query.addOrderBy(orderField);
+ query.addLimit(limit);
+
+ // Pre-buffer `limit` rows per JDBC round-trip instead of Derby's default
of 1 row.
+ query.fetchSize(limit);
+
+ try {
+ Stream<UnhealthyContainersRecord> stream =
+ query.fetchInto(UnhealthyContainersRecord.class).stream();
+
+ if (maxContainerId > 0) {
+ // Reverse-pagination path: SQL orders DESC (to get the last `limit`
rows before
+ // maxContainerId); re-sort to ASC so callers always see ascending
container IDs.
+ stream =
stream.sorted(Comparator.comparingLong(UnhealthyContainersRecord::getContainerId));
+ }
+ // Forward-pagination path: SQL already orders ASC — no Java re-sort
needed.
+
+ return stream.map(record -> new UnhealthyContainerRecordV2(
+ record.getContainerId(),
+ record.getContainerState(),
+ record.getInStateSince(),
+ record.getExpectedReplicaCount(),
+ record.getActualReplicaCount(),
+ record.getReplicaDelta(),
+ record.getReason()))
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ LOG.error("Failed to query V2 table", e);
+ return new ArrayList<>();
+ }
+ }
+
+ /**
+ * Clear all records from V2 table (for testing).
+ */
+ @VisibleForTesting
+ public void clearAllUnhealthyContainerRecords() {
+ DSLContext dslContext = containerSchemaDefinitionV2.getDSLContext();
+ try {
+ dslContext.deleteFrom(UNHEALTHY_CONTAINERS).execute();
+ LOG.info("Cleared all V2 unhealthy container records");
+ } catch (Exception e) {
+ LOG.error("Failed to clear V2 unhealthy container records", e);
+ }
+ }
+
+ /**
+ * POJO representing a record in UNHEALTHY_CONTAINERS table.
+ */
+ public static class UnhealthyContainerRecordV2 {
Review Comment:
Previously these classes in Recon are generated by jOOQ, aren't they? Like
UnhealthyContainersRecord:
```
package org.apache.ozone.recon.schema.generated.tables.records;
import javax.annotation.Generated;
import
org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable;
import org.jooq.Field;
import org.jooq.Record2;
import org.jooq.Record7;
import org.jooq.Row7;
import org.jooq.impl.UpdatableRecordImpl;
/**
* This class is generated by jOOQ.
*/
@Generated(
value = {
"http://www.jooq.org",
"jOOQ version:3.11.9"
},
comments = "This class is generated by jOOQ"
)
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
public class UnhealthyContainersRecord extends
UpdatableRecordImpl<UnhealthyContainersRecord> implements Record7<Long, String,
Long, Integer, Integer, Integer, String> {
```
Are you intentionally moving away from that pattern?
Also it was `UnhealthyContainersRecord` and now you are using
`UnhealthyContainerRecordV2`, container was in plural, it might be good to
follow that.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ReconReplicationManager.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.fsck;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerHealthState;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import
org.apache.hadoop.hdds.scm.container.replication.MonitoringReplicationQueue;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import
org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2;
+import
org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManagerV2.UnhealthyContainerRecordV2;
+import org.apache.hadoop.util.Time;
+import
org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recon-specific extension of SCM's ReplicationManager.
+ *
+ * <p><b>Key Differences from SCM:</b></p>
+ * <ol>
+ * <li>Uses NoOpsContainerReplicaPendingOps stub (no pending operations
tracking)</li>
+ * <li>Overrides processAll() to capture ALL container health states (no
100-sample limit)</li>
+ * <li>Stores results in Recon's UNHEALTHY_CONTAINERS table</li>
+ * <li>Does not issue replication commands (read-only monitoring)</li>
+ * </ol>
+ *
+ * <p><b>Why This Works Without PendingOps:</b></p>
+ * <p>SCM's health check logic uses a two-phase approach:
+ * <ul>
+ * <li><b>Phase 1 (Health Determination):</b> Calls
isSufficientlyReplicated(false)
+ * which ignores pending operations. This phase determines the health
state.</li>
+ * <li><b>Phase 2 (Command Deduplication):</b> Calls
isSufficientlyReplicated(true)
+ * which considers pending operations. This phase decides whether to
enqueue
+ * new commands.</li>
+ * </ul>
+ * Since Recon only needs Phase 1 (health determination) and doesn't issue
commands,
+ * the stub PendingOps does not cause false positives.</p>
+ *
+ * @see NoOpsContainerReplicaPendingOps
+ * @see ReconReplicationManagerReport
+ */
+public class ReconReplicationManager extends ReplicationManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReconReplicationManager.class);
+
+ private final ContainerHealthSchemaManagerV2 healthSchemaManager;
+ private final ContainerManager containerManager;
+
+ /**
+ * Immutable wiring context for ReconReplicationManager initialization.
+ */
+ public static final class InitContext {
+ private final ReplicationManagerConfiguration rmConf;
+ private final ConfigurationSource conf;
+ private final ContainerManager containerManager;
+ private final PlacementPolicy ratisContainerPlacement;
+ private final PlacementPolicy ecContainerPlacement;
+ private final EventPublisher eventPublisher;
+ private final SCMContext scmContext;
+ private final NodeManager nodeManager;
+ private final Clock clock;
+
+ private InitContext(Builder builder) {
+ this.rmConf = builder.rmConf;
+ this.conf = builder.conf;
+ this.containerManager = builder.containerManager;
+ this.ratisContainerPlacement = builder.ratisContainerPlacement;
+ this.ecContainerPlacement = builder.ecContainerPlacement;
+ this.eventPublisher = builder.eventPublisher;
+ this.scmContext = builder.scmContext;
+ this.nodeManager = builder.nodeManager;
+ this.clock = builder.clock;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for creating {@link InitContext} instances.
+ */
+ public static final class Builder {
+ private ReplicationManagerConfiguration rmConf;
+ private ConfigurationSource conf;
+ private ContainerManager containerManager;
+ private PlacementPolicy ratisContainerPlacement;
+ private PlacementPolicy ecContainerPlacement;
+ private EventPublisher eventPublisher;
+ private SCMContext scmContext;
+ private NodeManager nodeManager;
+ private Clock clock;
+
+ private Builder() {
+ }
+
+ public Builder setRmConf(ReplicationManagerConfiguration rmConf) {
+ this.rmConf = rmConf;
+ return this;
+ }
+
+ public Builder setConf(ConfigurationSource conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder setContainerManager(ContainerManager containerManager) {
+ this.containerManager = containerManager;
+ return this;
+ }
+
+ public Builder setRatisContainerPlacement(PlacementPolicy
ratisContainerPlacement) {
+ this.ratisContainerPlacement = ratisContainerPlacement;
+ return this;
+ }
+
+ public Builder setEcContainerPlacement(PlacementPolicy
ecContainerPlacement) {
+ this.ecContainerPlacement = ecContainerPlacement;
+ return this;
+ }
+
+ public Builder setEventPublisher(EventPublisher eventPublisher) {
+ this.eventPublisher = eventPublisher;
+ return this;
+ }
+
+ public Builder setScmContext(SCMContext scmContext) {
+ this.scmContext = scmContext;
+ return this;
+ }
+
+ public Builder setNodeManager(NodeManager nodeManager) {
+ this.nodeManager = nodeManager;
+ return this;
+ }
+
+ public Builder setClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ public InitContext build() {
+ return new InitContext(this);
+ }
+ }
+ }
+
+ public ReconReplicationManager(
+ InitContext initContext,
+ ContainerHealthSchemaManagerV2 healthSchemaManager) throws IOException {
+
+ // Call parent with stub PendingOps (proven to not cause false positives)
+ super(
+ initContext.rmConf,
+ initContext.conf,
+ initContext.containerManager,
+ initContext.ratisContainerPlacement,
+ initContext.ecContainerPlacement,
+ initContext.eventPublisher,
+ initContext.scmContext,
+ initContext.nodeManager,
+ initContext.clock,
+ new NoOpsContainerReplicaPendingOps(initContext.clock,
initContext.rmConf)
+ );
+
+ this.containerManager = initContext.containerManager;
+ this.healthSchemaManager = healthSchemaManager;
+ }
+
+ /**
+ * Override start() to prevent background threads from running.
+ *
+ * <p>In Recon, we don't want the ReplicationManager's background threads
+ * (replicationMonitor, underReplicatedProcessor, overReplicatedProcessor)
+ * to run continuously. Instead, we call processAll() manually from
+ * ContainerHealthTaskV2 on a schedule.</p>
+ *
+ * <p>This prevents:
+ * <ul>
+ * <li>Unnecessary CPU usage from continuous monitoring</li>
+ * <li>Initialization race conditions (start() being called before fields
are initialized)</li>
+ * <li>Replication commands being generated (Recon is read-only)</li>
+ * </ul>
+ * </p>
+ */
+ @Override
+ public synchronized void start() {
+ LOG.info("ReconReplicationManager.start() called - no-op (manual
invocation via processAll())");
+ // Do nothing - we call processAll() manually from ContainerHealthTaskV2
+ }
+
+ /**
+ * Checks if container replicas have mismatched data checksums.
+ * This is a Recon-specific check not done by SCM's ReplicationManager.
+ *
+ * <p>REPLICA_MISMATCH detection is crucial for identifying:
+ * <ul>
+ * <li>Bit rot (silent data corruption)</li>
+ * <li>Failed writes to some replicas</li>
+ * <li>Storage corruption on specific datanodes</li>
+ * <li>Network corruption during replication</li>
+ * </ul>
+ * </p>
+ *
+ * <p>This uses checksum mismatch logic:
+ * {@code
replicas.stream().map(ContainerReplica::getDataChecksum).distinct().count() !=
1}
+ * </p>
+ *
+ * @param replicas Set of container replicas to check
+ * @return true if replicas have different data checksums
+ */
+ private boolean hasDataChecksumMismatch(Set<ContainerReplica> replicas) {
+ if (replicas == null || replicas.isEmpty()) {
+ return false;
+ }
+
+ // Count distinct checksums (filter out nulls)
+ long distinctChecksums = replicas.stream()
+ .map(ContainerReplica::getDataChecksum)
+ .filter(Objects::nonNull)
+ .distinct()
+ .count();
+
+ // More than 1 distinct checksum = data mismatch
+ // 0 distinct checksums = all nulls, no mismatch
+ return distinctChecksums > 1;
+ }
+
+ /**
+ * Override processAll() to capture ALL per-container health states,
+ * not just aggregate counts and 100 samples.
+ *
+ * <p><b>Processing Flow:</b></p>
+ * <ol>
+ * <li>Get all containers from ContainerManager</li>
+ * <li>Process each container using inherited health check chain (SCM
logic)</li>
+ * <li>Additionally check for REPLICA_MISMATCH (Recon-specific)</li>
+ * <li>Capture ALL unhealthy container IDs per health state (no sampling
limit)</li>
+ * <li>Store results in Recon's UNHEALTHY_CONTAINERS table</li>
+ * </ol>
+ *
+ * <p><b>Differences from SCM's processAll():</b></p>
+ * <ul>
+ * <li>Uses ReconReplicationManagerReport (captures all containers)</li>
+ * <li>Uses MonitoringReplicationQueue (doesn't enqueue commands)</li>
+ * <li>Adds REPLICA_MISMATCH detection (not done by SCM)</li>
+ * <li>Stores results in database instead of just keeping in-memory
report</li>
+ * </ul>
+ */
+ @Override
+ public synchronized void processAll() {
+ LOG.info("ReconReplicationManager starting container health check");
+
+ final long startTime = Time.monotonicNow();
+
+ // Use extended report that captures ALL containers, not just 100 samples
+ final ReconReplicationManagerReport report = new
ReconReplicationManagerReport();
+ final ReplicationQueue nullQueue = new MonitoringReplicationQueue();
+
+ // Get all containers (same as parent)
+ final List<ContainerInfo> containers = containerManager.getContainers();
+
+ LOG.info("Processing {} containers", containers.size());
+
+ // Process each container (reuses inherited processContainer and health
check chain)
+ int processedCount = 0;
+ for (ContainerInfo container : containers) {
+ report.increment(container.getState());
+ try {
+ ContainerID cid = container.containerID();
+
+ // Call inherited processContainer - this runs SCM's health check chain
+ // readOnly=true ensures no commands are generated
+ processContainer(container, nullQueue, report, true);
+
+ // ADDITIONAL CHECK: Detect REPLICA_MISMATCH (Recon-specific, not in
SCM)
+ Set<ContainerReplica> replicas =
containerManager.getContainerReplicas(cid);
+ if (hasDataChecksumMismatch(replicas)) {
+ report.addReplicaMismatchContainer(cid);
+ LOG.debug("Container {} has data checksum mismatch across replicas",
cid);
+ }
+
+ processedCount++;
+
+ if (processedCount % 10000 == 0) {
+ LOG.info("Processed {}/{} containers", processedCount,
containers.size());
+ }
+ } catch (ContainerNotFoundException e) {
+ LOG.error("Container {} not found", container.getContainerID(), e);
+ }
+ }
+
+ report.setComplete();
+
+ // Store ALL per-container health states to database
+ storeHealthStatesToDatabase(report, containers);
+
+ long duration = Time.monotonicNow() - startTime;
+ LOG.info("ReconReplicationManager completed in {}ms for {} containers",
+ duration, containers.size());
+ }
+
+ /**
+ * Convert ReconReplicationManagerReport to database records and store.
+ * This captures all unhealthy containers with detailed replica counts.
+ *
+ * @param report The report with all captured container health states
+ * @param allContainers List of all containers for cleanup
+ */
+ private void storeHealthStatesToDatabase(
+ ReconReplicationManagerReport report,
+ List<ContainerInfo> allContainers) {
+
+ long currentTime = System.currentTimeMillis();
Review Comment:
This means that every time we call `processAll()` it'll use the current time
as `inStateSince`? That makes `inStateSince` irrelevant as it'll always be the
last timestamp the task ran, not the time since the container is in that state.
##########
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/AbstractReconSqlDBTest.java:
##########
@@ -179,12 +179,12 @@ public String getDriverClass() {
@Override
public String getJdbcUrl() {
return "jdbc:derby:" + tempDir.getAbsolutePath() +
- File.separator + "derby_recon.db";
+ File.separator + "derby_recon.db;create=true";
}
@Override
public String getUserName() {
- return null;
+ return "RECON";
Review Comment:
Why are these changes needed?
##########
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java:
##########
@@ -950,8 +949,8 @@ public void testUnhealthyContainers() throws IOException,
TimeoutException {
public void testUnhealthyContainersFilteredResponse()
throws IOException, TimeoutException {
String missing = UnHealthyContainerStates.MISSING.toString();
- String emptyMissing = UnHealthyContainerStates.EMPTY_MISSING.toString();
Review Comment:
I'd keep it the way it was, it's easier to maintain.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]