Copilot commented on code in PR #7222: URL: https://github.com/apache/hbase/pull/7222#discussion_r2274117923
########## hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Unit tests for {@link BulkLoadProcessor}. + * <p> + * These tests validate the extraction of bulk-loaded file paths from WAL entries under different + * scenarios, including: + * <ul> + * <li>Valid replicable bulk load entries</li> + * <li>Non-replicable bulk load entries</li> + * <li>Entries with no bulk load qualifier</li> + * <li>Entries containing multiple column families</li> + * </ul> + */ +@Category({ SmallTests.class }) +public class TestBulkLoadProcessor { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadProcessor.class); + + /** + * Creates a WAL.Entry containing a {@link WALProtos.BulkLoadDescriptor} with the given + * parameters. + * @param tableName The table name + * @param regionName The encoded region name + * @param replicate Whether the bulk load is marked for replication + * @param family Column family name + * @param storeFiles One or more store file names to include + * @return A WAL.Entry representing the bulk load event + */ + private WAL.Entry createBulkLoadWalEntry(TableName tableName, String regionName, + boolean replicate, String family, String... storeFiles) { + + // Build StoreDescriptor + WALProtos.StoreDescriptor.Builder storeDescBuilder = + WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family)) + .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles)); + + // Build BulkLoadDescriptor + WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = WALProtos.BulkLoadDescriptor.newBuilder() + .setReplicate(replicate).setEncodedRegionName(ByteString.copyFromUtf8(regionName)) + .setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000) // Random + .addStores(storeDescBuilder); + + byte[] value = bulkDescBuilder.build().toByteArray(); + + // Build Cell with BULK_LOAD qualifier + Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put) + .setRow(new byte[] { 1 }).setFamily(METAFAMILY).setQualifier(WALEdit.BULK_LOAD) + .setValue(value).build(); + + WALEdit edit = new WALEdit(); + edit.add(cell); + + WALKeyImpl key = new WALKeyImpl(Bytes.toBytes(regionName), // region + tableName, 0L, 0L, null); + + return new WAL.Entry(key, edit); + } + + /** + * Verifies that a valid replicable bulk load WAL entry produces the correct number and structure + * of file paths. + */ + @Test + public void testProcessBulkLoadFiles_validEntry() throws IOException { + WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", true, + "cf1", "file1", "file2"); + + List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry)); + System.out.println(paths); + + assertEquals(2, paths.size()); + System.out.println(paths); + Review Comment: Debug print statements should be removed from production test code. Use proper logging if debug output is needed for troubleshooting. ```suggestion assertEquals(2, paths.size()); ``` ########## hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Unit tests for {@link BulkLoadProcessor}. + * <p> + * These tests validate the extraction of bulk-loaded file paths from WAL entries under different + * scenarios, including: + * <ul> + * <li>Valid replicable bulk load entries</li> + * <li>Non-replicable bulk load entries</li> + * <li>Entries with no bulk load qualifier</li> + * <li>Entries containing multiple column families</li> + * </ul> + */ +@Category({ SmallTests.class }) +public class TestBulkLoadProcessor { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadProcessor.class); + + /** + * Creates a WAL.Entry containing a {@link WALProtos.BulkLoadDescriptor} with the given + * parameters. + * @param tableName The table name + * @param regionName The encoded region name + * @param replicate Whether the bulk load is marked for replication + * @param family Column family name + * @param storeFiles One or more store file names to include + * @return A WAL.Entry representing the bulk load event + */ + private WAL.Entry createBulkLoadWalEntry(TableName tableName, String regionName, + boolean replicate, String family, String... storeFiles) { + + // Build StoreDescriptor + WALProtos.StoreDescriptor.Builder storeDescBuilder = + WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family)) + .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles)); + + // Build BulkLoadDescriptor + WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = WALProtos.BulkLoadDescriptor.newBuilder() + .setReplicate(replicate).setEncodedRegionName(ByteString.copyFromUtf8(regionName)) + .setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000) // Random + .addStores(storeDescBuilder); + + byte[] value = bulkDescBuilder.build().toByteArray(); + + // Build Cell with BULK_LOAD qualifier + Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put) + .setRow(new byte[] { 1 }).setFamily(METAFAMILY).setQualifier(WALEdit.BULK_LOAD) + .setValue(value).build(); + + WALEdit edit = new WALEdit(); + edit.add(cell); + + WALKeyImpl key = new WALKeyImpl(Bytes.toBytes(regionName), // region + tableName, 0L, 0L, null); + + return new WAL.Entry(key, edit); + } + + /** + * Verifies that a valid replicable bulk load WAL entry produces the correct number and structure + * of file paths. + */ + @Test + public void testProcessBulkLoadFiles_validEntry() throws IOException { + WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", true, + "cf1", "file1", "file2"); + + List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry)); + System.out.println(paths); + + assertEquals(2, paths.size()); + System.out.println(paths); + + assertTrue(paths.get(0).toString().contains("ns/tbl/region123/cf1/file1")); + assertTrue(paths.get(1).toString().contains("ns/tbl/region123/cf1/file2")); + } + + /** + * Verifies that a non-replicable bulk load entry is ignored. + */ + @Test + public void testProcessBulkLoadFiles_nonReplicableSkipped() throws IOException { + WAL.Entry entry = + createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", false, "cf1", "file1"); + + List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry)); + + assertTrue(paths.isEmpty()); + } + + /** + * Verifies that entries without the BULK_LOAD qualifier are ignored. + */ + @Test + public void testProcessBulkLoadFiles_noBulkLoadQualifier() throws IOException { + WALEdit edit = new WALEdit(); + WALKeyImpl key = new WALKeyImpl(new byte[] {}, TableName.valueOf("ns", "tbl"), 0L, 0L, null); + WAL.Entry entry = new WAL.Entry(key, edit); + + List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry)); + + assertTrue(paths.isEmpty()); + } + + /** + * Verifies that multiple WAL entries with different column families produce the correct set of + * file paths. + */ + @Test + public void testProcessBulkLoadFiles_multipleFamilies() throws IOException { + WAL.Entry entry = + createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", true, "cf1", "file1"); + WAL.Entry entry2 = + createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", true, "cf2", "fileA"); + + List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(Arrays.asList(entry, entry2)); + System.out.println(paths); + Review Comment: Debug print statements should be removed from production test code. Use proper logging if debug output is needed for troubleshooting. ```suggestion ``` ########## hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Unit tests for {@link BulkLoadProcessor}. + * <p> + * These tests validate the extraction of bulk-loaded file paths from WAL entries under different + * scenarios, including: + * <ul> + * <li>Valid replicable bulk load entries</li> + * <li>Non-replicable bulk load entries</li> + * <li>Entries with no bulk load qualifier</li> + * <li>Entries containing multiple column families</li> + * </ul> + */ +@Category({ SmallTests.class }) +public class TestBulkLoadProcessor { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadProcessor.class); + + /** + * Creates a WAL.Entry containing a {@link WALProtos.BulkLoadDescriptor} with the given + * parameters. + * @param tableName The table name + * @param regionName The encoded region name + * @param replicate Whether the bulk load is marked for replication + * @param family Column family name + * @param storeFiles One or more store file names to include + * @return A WAL.Entry representing the bulk load event + */ + private WAL.Entry createBulkLoadWalEntry(TableName tableName, String regionName, + boolean replicate, String family, String... storeFiles) { + + // Build StoreDescriptor + WALProtos.StoreDescriptor.Builder storeDescBuilder = + WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family)) + .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles)); + + // Build BulkLoadDescriptor + WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = WALProtos.BulkLoadDescriptor.newBuilder() + .setReplicate(replicate).setEncodedRegionName(ByteString.copyFromUtf8(regionName)) + .setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000) // Random + .addStores(storeDescBuilder); + + byte[] value = bulkDescBuilder.build().toByteArray(); + + // Build Cell with BULK_LOAD qualifier + Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put) + .setRow(new byte[] { 1 }).setFamily(METAFAMILY).setQualifier(WALEdit.BULK_LOAD) + .setValue(value).build(); + + WALEdit edit = new WALEdit(); + edit.add(cell); + + WALKeyImpl key = new WALKeyImpl(Bytes.toBytes(regionName), // region + tableName, 0L, 0L, null); + + return new WAL.Entry(key, edit); + } + + /** + * Verifies that a valid replicable bulk load WAL entry produces the correct number and structure + * of file paths. + */ + @Test + public void testProcessBulkLoadFiles_validEntry() throws IOException { + WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", true, + "cf1", "file1", "file2"); + + List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry)); + System.out.println(paths); + + assertEquals(2, paths.size()); + System.out.println(paths); + Review Comment: Debug print statements should be removed from production test code. Use proper logging if debug output is needed for troubleshooting. ```suggestion assertEquals(2, paths.size()); ``` ########## hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java: ########## @@ -190,6 +191,14 @@ private void handleContinuousBackup(Admin admin) throws IOException { // set overall backup status: complete. Here we make sure to complete the backup. // After this checkpoint, even if entering cancel process, will let the backup finished backupInfo.setState(BackupState.COMPLETE); + + if (!conf.getBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, false)) { + System.out.println("NOTE: Bulkload replication is not enabled. " Review Comment: Using System.out.println for user notifications in production code is not recommended. Consider using proper logging (LOG.info) or a more appropriate output mechanism for user-facing messages. ```suggestion LOG.warn("NOTE: Bulkload replication is not enabled. " ``` ########## hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java: ########## @@ -298,11 +301,21 @@ private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOExc try { FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, this::createWalWriter); + List<Path> bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(walEntries); + + if (LOG.isTraceEnabled()) { + LOG.trace("{} Processed {} bulk load files for WAL entries", Utils.logPeerId(peerId), + bulkLoadFiles.size()); + LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId), + bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); + } for (WAL.Entry entry : walEntries) { walWriter.append(entry); } + walWriter.sync(true); + uploadBulkLoadFiles(day, bulkLoadFiles); Review Comment: The bulk load file upload is performed synchronously for each WAL entry batch. This could become a performance bottleneck if there are many bulk load files. Consider batching or asynchronous processing for better performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
