Copilot commented on code in PR #7222:
URL: https://github.com/apache/hbase/pull/7222#discussion_r2274117923


##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Unit tests for {@link BulkLoadProcessor}.
+ * <p>
+ * These tests validate the extraction of bulk-loaded file paths from WAL 
entries under different
+ * scenarios, including:
+ * <ul>
+ * <li>Valid replicable bulk load entries</li>
+ * <li>Non-replicable bulk load entries</li>
+ * <li>Entries with no bulk load qualifier</li>
+ * <li>Entries containing multiple column families</li>
+ * </ul>
+ */
+@Category({ SmallTests.class })
+public class TestBulkLoadProcessor {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadProcessor.class);
+
+  /**
+   * Creates a WAL.Entry containing a {@link WALProtos.BulkLoadDescriptor} 
with the given
+   * parameters.
+   * @param tableName  The table name
+   * @param regionName The encoded region name
+   * @param replicate  Whether the bulk load is marked for replication
+   * @param family     Column family name
+   * @param storeFiles One or more store file names to include
+   * @return A WAL.Entry representing the bulk load event
+   */
+  private WAL.Entry createBulkLoadWalEntry(TableName tableName, String 
regionName,
+    boolean replicate, String family, String... storeFiles) {
+
+    // Build StoreDescriptor
+    WALProtos.StoreDescriptor.Builder storeDescBuilder =
+      
WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family))
+        .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles));
+
+    // Build BulkLoadDescriptor
+    WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = 
WALProtos.BulkLoadDescriptor.newBuilder()
+      
.setReplicate(replicate).setEncodedRegionName(ByteString.copyFromUtf8(regionName))
+      
.setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000) 
// Random
+      .addStores(storeDescBuilder);
+
+    byte[] value = bulkDescBuilder.build().toByteArray();
+
+    // Build Cell with BULK_LOAD qualifier
+    Cell cell = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put)
+      .setRow(new byte[] { 1 
}).setFamily(METAFAMILY).setQualifier(WALEdit.BULK_LOAD)
+      .setValue(value).build();
+
+    WALEdit edit = new WALEdit();
+    edit.add(cell);
+
+    WALKeyImpl key = new WALKeyImpl(Bytes.toBytes(regionName), // region
+      tableName, 0L, 0L, null);
+
+    return new WAL.Entry(key, edit);
+  }
+
+  /**
+   * Verifies that a valid replicable bulk load WAL entry produces the correct 
number and structure
+   * of file paths.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_validEntry() throws IOException {
+    WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), 
"region123", true,
+      "cf1", "file1", "file2");
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+    System.out.println(paths);
+
+    assertEquals(2, paths.size());
+    System.out.println(paths);
+

Review Comment:
   Debug print statements should be removed from production test code. Use 
proper logging if debug output is needed for troubleshooting.
   ```suggestion
   
   
       assertEquals(2, paths.size());
   ```



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Unit tests for {@link BulkLoadProcessor}.
+ * <p>
+ * These tests validate the extraction of bulk-loaded file paths from WAL 
entries under different
+ * scenarios, including:
+ * <ul>
+ * <li>Valid replicable bulk load entries</li>
+ * <li>Non-replicable bulk load entries</li>
+ * <li>Entries with no bulk load qualifier</li>
+ * <li>Entries containing multiple column families</li>
+ * </ul>
+ */
+@Category({ SmallTests.class })
+public class TestBulkLoadProcessor {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadProcessor.class);
+
+  /**
+   * Creates a WAL.Entry containing a {@link WALProtos.BulkLoadDescriptor} 
with the given
+   * parameters.
+   * @param tableName  The table name
+   * @param regionName The encoded region name
+   * @param replicate  Whether the bulk load is marked for replication
+   * @param family     Column family name
+   * @param storeFiles One or more store file names to include
+   * @return A WAL.Entry representing the bulk load event
+   */
+  private WAL.Entry createBulkLoadWalEntry(TableName tableName, String 
regionName,
+    boolean replicate, String family, String... storeFiles) {
+
+    // Build StoreDescriptor
+    WALProtos.StoreDescriptor.Builder storeDescBuilder =
+      
WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family))
+        .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles));
+
+    // Build BulkLoadDescriptor
+    WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = 
WALProtos.BulkLoadDescriptor.newBuilder()
+      
.setReplicate(replicate).setEncodedRegionName(ByteString.copyFromUtf8(regionName))
+      
.setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000) 
// Random
+      .addStores(storeDescBuilder);
+
+    byte[] value = bulkDescBuilder.build().toByteArray();
+
+    // Build Cell with BULK_LOAD qualifier
+    Cell cell = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put)
+      .setRow(new byte[] { 1 
}).setFamily(METAFAMILY).setQualifier(WALEdit.BULK_LOAD)
+      .setValue(value).build();
+
+    WALEdit edit = new WALEdit();
+    edit.add(cell);
+
+    WALKeyImpl key = new WALKeyImpl(Bytes.toBytes(regionName), // region
+      tableName, 0L, 0L, null);
+
+    return new WAL.Entry(key, edit);
+  }
+
+  /**
+   * Verifies that a valid replicable bulk load WAL entry produces the correct 
number and structure
+   * of file paths.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_validEntry() throws IOException {
+    WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), 
"region123", true,
+      "cf1", "file1", "file2");
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+    System.out.println(paths);
+
+    assertEquals(2, paths.size());
+    System.out.println(paths);
+
+    assertTrue(paths.get(0).toString().contains("ns/tbl/region123/cf1/file1"));
+    assertTrue(paths.get(1).toString().contains("ns/tbl/region123/cf1/file2"));
+  }
+
+  /**
+   * Verifies that a non-replicable bulk load entry is ignored.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_nonReplicableSkipped() throws 
IOException {
+    WAL.Entry entry =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", 
false, "cf1", "file1");
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+
+    assertTrue(paths.isEmpty());
+  }
+
+  /**
+   * Verifies that entries without the BULK_LOAD qualifier are ignored.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_noBulkLoadQualifier() throws 
IOException {
+    WALEdit edit = new WALEdit();
+    WALKeyImpl key = new WALKeyImpl(new byte[] {}, TableName.valueOf("ns", 
"tbl"), 0L, 0L, null);
+    WAL.Entry entry = new WAL.Entry(key, edit);
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+
+    assertTrue(paths.isEmpty());
+  }
+
+  /**
+   * Verifies that multiple WAL entries with different column families produce 
the correct set of
+   * file paths.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_multipleFamilies() throws IOException {
+    WAL.Entry entry =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", 
true, "cf1", "file1");
+    WAL.Entry entry2 =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", 
true, "cf2", "fileA");
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Arrays.asList(entry, entry2));
+    System.out.println(paths);
+

Review Comment:
   Debug print statements should be removed from production test code. Use 
proper logging if debug output is needed for troubleshooting.
   ```suggestion
   
   ```



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Unit tests for {@link BulkLoadProcessor}.
+ * <p>
+ * These tests validate the extraction of bulk-loaded file paths from WAL 
entries under different
+ * scenarios, including:
+ * <ul>
+ * <li>Valid replicable bulk load entries</li>
+ * <li>Non-replicable bulk load entries</li>
+ * <li>Entries with no bulk load qualifier</li>
+ * <li>Entries containing multiple column families</li>
+ * </ul>
+ */
+@Category({ SmallTests.class })
+public class TestBulkLoadProcessor {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadProcessor.class);
+
+  /**
+   * Creates a WAL.Entry containing a {@link WALProtos.BulkLoadDescriptor} 
with the given
+   * parameters.
+   * @param tableName  The table name
+   * @param regionName The encoded region name
+   * @param replicate  Whether the bulk load is marked for replication
+   * @param family     Column family name
+   * @param storeFiles One or more store file names to include
+   * @return A WAL.Entry representing the bulk load event
+   */
+  private WAL.Entry createBulkLoadWalEntry(TableName tableName, String 
regionName,
+    boolean replicate, String family, String... storeFiles) {
+
+    // Build StoreDescriptor
+    WALProtos.StoreDescriptor.Builder storeDescBuilder =
+      
WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family))
+        .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles));
+
+    // Build BulkLoadDescriptor
+    WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = 
WALProtos.BulkLoadDescriptor.newBuilder()
+      
.setReplicate(replicate).setEncodedRegionName(ByteString.copyFromUtf8(regionName))
+      
.setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000) 
// Random
+      .addStores(storeDescBuilder);
+
+    byte[] value = bulkDescBuilder.build().toByteArray();
+
+    // Build Cell with BULK_LOAD qualifier
+    Cell cell = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put)
+      .setRow(new byte[] { 1 
}).setFamily(METAFAMILY).setQualifier(WALEdit.BULK_LOAD)
+      .setValue(value).build();
+
+    WALEdit edit = new WALEdit();
+    edit.add(cell);
+
+    WALKeyImpl key = new WALKeyImpl(Bytes.toBytes(regionName), // region
+      tableName, 0L, 0L, null);
+
+    return new WAL.Entry(key, edit);
+  }
+
+  /**
+   * Verifies that a valid replicable bulk load WAL entry produces the correct 
number and structure
+   * of file paths.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_validEntry() throws IOException {
+    WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), 
"region123", true,
+      "cf1", "file1", "file2");
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+    System.out.println(paths);
+
+    assertEquals(2, paths.size());
+    System.out.println(paths);
+

Review Comment:
   Debug print statements should be removed from production test code. Use 
proper logging if debug output is needed for troubleshooting.
   ```suggestion
   
   
       assertEquals(2, paths.size());
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java:
##########
@@ -190,6 +191,14 @@ private void handleContinuousBackup(Admin admin) throws 
IOException {
     // set overall backup status: complete. Here we make sure to complete the 
backup.
     // After this checkpoint, even if entering cancel process, will let the 
backup finished
     backupInfo.setState(BackupState.COMPLETE);
+
+    if (!conf.getBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, false)) {
+      System.out.println("NOTE: Bulkload replication is not enabled. "

Review Comment:
   Using System.out.println for user notifications in production code is not 
recommended. Consider using proper logging (LOG.info) or a more appropriate 
output mechanism for user-facing messages.
   ```suggestion
         LOG.warn("NOTE: Bulkload replication is not enabled. "
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java:
##########
@@ -298,11 +301,21 @@ private void backupWalEntries(long day, List<WAL.Entry> 
walEntries) throws IOExc
 
     try {
       FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, 
this::createWalWriter);
+      List<Path> bulkLoadFiles = 
BulkLoadProcessor.processBulkLoadFiles(walEntries);
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("{} Processed {} bulk load files for WAL entries", 
Utils.logPeerId(peerId),
+          bulkLoadFiles.size());
+        LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId),
+          
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
+      }
 
       for (WAL.Entry entry : walEntries) {
         walWriter.append(entry);
       }
+
       walWriter.sync(true);
+      uploadBulkLoadFiles(day, bulkLoadFiles);

Review Comment:
   The bulk load file upload is performed synchronously for each WAL entry 
batch. This could become a performance bottleneck if there are many bulk load 
files. Consider batching or asynchronous processing for better performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to