
ASF GitHub Bot logged work on HIVE-26265:

                Author: ASF GitHub Bot
            Created on: 23/Jun/22 15:41
            Start Date: 23/Jun/22 15:41
    Worklog Time Spent: 10m 
      Work Description: pvary commented on code in PR #3365:
URL: https://github.com/apache/hive/pull/3365#discussion_r905185430

@@ -0,0 +1,490 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.*;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import static 
+ * TestTxnReplicationOptimizations - Test transaction replication.
+ */
+public class TestReplicationFilterTransactions {
+  static final private Logger LOG = 
+  private final static String tid =
 + "_" + System.currentTimeMillis();
+  private final static String TEST_PATH =
+          System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + 
+  @Rule
+  public TemporaryFolder tempFolder= new TemporaryFolder();
+  static public class PrimaryEventListenerTestImpl extends 
MetaStoreEventListener {
+    public PrimaryEventListenerTestImpl(Configuration conf) {
+      super(conf);
+    }
+    private static final AtomicInteger countOpenTxn = new AtomicInteger(0);
+    private static final AtomicInteger countCommitTxn = new AtomicInteger (0);
+    private static final AtomicInteger countAbortTxn = new AtomicInteger (0);
+    @Override
+    public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator);
+      countOpenTxn.getAndIncrement();
+    }
+    @Override
+    public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onCommitTxn(commitTxnEvent, dbConn, sqlGenerator);
+      countCommitTxn.getAndIncrement();
+    }
+    @Override
+    public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onAbortTxn(abortTxnEvent, dbConn, sqlGenerator);
+      countAbortTxn.getAndIncrement();
+    }
+    public static int getCountOpenTxn() {
+      return countOpenTxn.get();
+    }
+    public static int getCountCommitTxn() {
+      return countCommitTxn.get();
+    }
+    public static int getCountAbortTxn() {
+      return countAbortTxn.get();
+    }
+    public static void reset() {
+      countOpenTxn.set(0);
+      countCommitTxn.set(0);
+      countAbortTxn.set(0);
+    }
+  }
+  static public class ReplicaEventListenerTestImpl extends 
MetaStoreEventListener {
+    public ReplicaEventListenerTestImpl(Configuration conf) {
+      super(conf);
+    }
+    private static final AtomicInteger countOpenTxn = new AtomicInteger(0);
+    private static final AtomicInteger countCommitTxn = new AtomicInteger (0);
+    private static final AtomicInteger countAbortTxn = new AtomicInteger (0);
+    private static final Map<Long, Long> txnMapping = new 
ConcurrentHashMap<Long, Long>();
+    @Override
+    public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator);
+      countOpenTxn.getAndIncrement();
+      // Following code tries to read/save REPL_TXN_MAP, so we can check later 
test that target to source TxnId
+      // mapping was done.
+      // But the select below seems cause the following lock error
+      //     org.apache.hadoop.hive.ql.lockmgr.LockException: Error 
communicating with the metastore
+//      try {
+//        TestReplicationFilterTransactions.updateTxnMapping(txnMapping);
+//      } catch (Exception e) {
+//        throw new RuntimeException(e);
+//      }
+    }
+    @Override
+    public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onCommitTxn(commitTxnEvent, dbConn, sqlGenerator);
+      countCommitTxn.getAndIncrement();
+    }
+    @Override
+    public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onAbortTxn(abortTxnEvent, dbConn, sqlGenerator);
+      countAbortTxn.getAndIncrement();
+    }
+    public static int getCountOpenTxn() {
+      return countOpenTxn.get();
+    }
+    public static int getCountCommitTxn() {
+      return countCommitTxn.get();
+    }
+    public static int getCountAbortTxn() {
+      return countAbortTxn.get();
+    }
+    public static Map<Long, Long> getTxnMapping() { return new 
HashMap(txnMapping); }
+    public static void reset() {
+      countOpenTxn.set(0);
+      countCommitTxn.set(0);
+      countAbortTxn.set(0);
+      txnMapping.clear();
+    }
+  }
+  static class EventCount {
+    int countOpenTxn;
+    int countCommitTxn;
+    int countAbortTxn;
+    public EventCount(int countOpenTxn, int countCommitTxn, int countAbortTxn) 
+      this.countOpenTxn = countOpenTxn;
+      this.countCommitTxn = countCommitTxn;
+      this.countAbortTxn = countAbortTxn;
+    }
+    public int getCountOpenTxn() {
+      return countOpenTxn;
+    }
+    public int getCountCommitTxn() {
+      return countCommitTxn;
+    }
+    public int getCountAbortTxn() {
+      return countAbortTxn;
+    }
+  }
+  @Rule
+  public final TestName testName = new TestName();
+  static WarehouseInstance primary;
+  static WarehouseInstance replica;
+  static HiveConf dfsConf;
+  String primaryDbName, replicatedDbName, otherDbName;
+  EventCount expected;
+  private Map<String, String> setupConf(String dfsUri, String 
listenerClassName) {
+    Map<String, String> confMap = new HashMap<String, String>();
+    confMap.put("fs.defaultFS", dfsUri);
+    confMap.put("hive.support.concurrency", "true");
+    confMap.put("hive.txn.manager", 
+    confMap.put("hive.metastore.client.capability.check", "false");
+    confMap.put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+    confMap.put("hive.strict.checks.bucketing", "false");
+    confMap.put("hive.mapred.mode", "nonstrict");
+    confMap.put("mapred.input.dir.recursive", "true");
+    confMap.put("hive.metastore.disallow.incompatible.col.type.changes", 
+    confMap.put("hive.stats.autogather", "false");
+    confMap.put("hive.in.repl.test", "true");
+    confMap.put(MetastoreConf.ConfVars.EVENT_LISTENERS.getVarname(), 
+    confMap.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+            GzipJSONMessageEncoder.class.getCanonicalName());
+    return confMap;
+  }
+  @Before
+  public void setup() throws Throwable {
+    TestReplicationFilterTransactions.dfsConf = new 
+    TestReplicationFilterTransactions.dfsConf.set("hadoop.proxyuser." + 
Utils.getUGI().getShortUserName() + ".hosts", "*");
+    TestReplicationFilterTransactions.dfsConf.set("dfs.namenode.acls.enabled", 
+    MiniDFSCluster miniDFSCluster =
+            new 
+    Map<String, String> conf = 
+            PrimaryEventListenerTestImpl.class.getName());
+    primary = new WarehouseInstance(LOG, miniDFSCluster, conf);
+    conf = setupConf(miniDFSCluster.getFileSystem().getUri().toString(),
+            ReplicaEventListenerTestImpl.class.getName());
+    conf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, conf);
+    primaryDbName = testName.getMethodName() + "_" + 
+    replicatedDbName = "replicated_" + primaryDbName;
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
+    otherDbName = testName.getMethodName() + "_other_" + 
+    primary.run("create database " + otherDbName);
+    PrimaryEventListenerTestImpl.reset();
+    ReplicaEventListenerTestImpl.reset();
+    expected = new EventCount(8, 6, 2);
+  }
+  static void updateTxnMapping(Map<Long, Long> map) throws Exception {
+    String result = TestTxnDbUtil.queryToString(replica.hiveConf,
\"REPL_TXN_MAP\"", false);
+    Assert.assertNotNull(result);
+    if (result.isEmpty()) {
+      return;
+    }
+    String[] rows = result.split("\n");
+    for (int i = 0; i < rows.length; i++) {
+      String[] cols = rows[i].split("    ");
+      Long srcTxnId = Long.valueOf(cols[1]);
+      Long tgtTxnId = Long.valueOf(cols[0]);
+      map.put(srcTxnId, tgtTxnId);
+    }
+  }
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + primaryDbName + " cascade");
+    primary.run("drop database if exists " + otherDbName + " cascade");
+    replica.run("drop database if exists " + replicatedDbName + " cascade");
+    primary.close();
+    replica.close();
+  }
+  private void prepareBootstrapData() throws Throwable {
+    // primaryDbName is replicated, t2 and t2 are ACID tables with initial 
+    // t3 is an ACID table with 2 initial rows, later t3 will be locked to 
force aborted transaction.
+    primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                  "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(1)")
+            .run("create table t2 (place string) partitioned by (country 
string) clustered by(place) " +
+                    "into 3 buckets stored as orc tblproperties 
+            .run("insert into t2 partition(country='india') values 
+            .run("create table t3 (id int) stored as orc tblproperties 
+            .run("insert into t3 values(111), (222)");
+    // otherDbName is not replicated, but contains ACID tables.
+    primary.run("use " + otherDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(100)")
+            .run("create table t2 (place string) partitioned by (country 
string) clustered by(place) " +
+                    "into 3 buckets stored as orc tblproperties 
+            .run("insert into t2 partition(country='usa') values ('san 
+            .run("create table t3 (id int) stored as orc tblproperties 
+            .run("insert into t3 values(1110), (2220)");
+  }
+  private void alterBucketFile(Path warehouseRoot, String dbName, String 
tableName, boolean toCorrupted) throws IOException {
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    String bucket = "bucket_00000_0";
+    //Path location = new Path(new Path(warehouseRoot, dbName.toLowerCase() + 
".db"), tableName);
+    PathBuilder pb = new PathBuilder(warehouseRoot.toString())
+            .addDescendant(dbName.toLowerCase() + ".db")
+            .addDescendant(tableName)
+            .addDescendant("delta_0000001_0000001_0000")
+            .addDescendant(bucket);
+    Path location = pb.build();
+    File junkFile = new File(tempFolder.getRoot(), "junk");
+    File saveFolder = new File(tempFolder.getRoot(), dbName + "_" + tableName);
+    if (toCorrupted) {
+      if (!junkFile.exists()) {
+        File junk = tempFolder.newFile("junk");
+        FileUtils.writeStringToFile(junk, "junk", StandardCharsets.UTF_8);
+      }
+      Path dest = new Path(saveFolder.getAbsolutePath(), bucket);
+      fs.copyToLocalFile(true, location, dest);
+      fs.copyFromLocalFile(false, true, new Path(junkFile.getAbsolutePath()), 
+    } else {
+      Path src = new Path(saveFolder.getAbsolutePath(), bucket);
+      fs.copyFromLocalFile(true, true, src, location);
+    }
+  }
+  private void prepareAbortTxn(String dbName, int value) throws Throwable {
+    // Forces an abortTxn even to be generated in the database.
+    // The abortTxn needs to be generated during the execution phase of the 
+    // to do so, the bucket file of the table is intentionally mangled to
+    // induce an error and abortTxn during the execution phase.
+    alterBucketFile(primary.warehouseRoot, dbName, "t3", true);
+    try {
+      primary.run("use " + dbName)
+             .run("update t3 set id = 999 where id = " + 
+      Assert.fail("Update should have failed");
+    } catch (Throwable t) {
+      Assert.assertTrue(t.getCause().getCause() instanceof 
+    }
+    alterBucketFile(primary.warehouseRoot, dbName, "t3", false);
+  }
+  private void prepareIncrementalData() throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values (2), (3)")
+            .run("insert into t2 partition(country='india') values 
+            .run("insert into t2 partition(country='india') values ('pune')");
+    prepareAbortTxn(primaryDbName, 222);
+    primary.run("use " + otherDbName)
+            .run("insert into t1 values (200), (300)")
+            .run("insert into t2 partition(country='usa') values ('santa 
+            .run("insert into t2 partition(country='usa') values ('palo 
+    prepareAbortTxn(otherDbName, 2220);
+  }
+  private void burnTransactionsOnReplica() throws Throwable {
+    // On the replica, burn some transaction ids so that load needs to do 
+    replica.run("use " + replicatedDbName)
+            .run("create table t999 (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t999 values (99901)")
+            .run("insert into t999 values (99902)")
+            .run("insert into t999 values (99903)")
+            .run("insert into t999 values (99904)")
+            .run("insert into t999 values (99905)")
+            .run("insert into t999 values (99906)")
+            .run("insert into t999 values (99907)")
+            .run("insert into t999 values (99908)")
+            .run("insert into t999 values (99909)")
+            .run("insert into t999 values (99910)");
+  }
+  private List<String> withTxnOptimized(boolean optimizationOn) {
+    return Collections.singletonList(String.format("'%s'='%s'", 
+            String.valueOf(optimizationOn)));
+  }
+  @Test
+  public void testTxnEventsUnoptimized() throws Throwable {
+    prepareBootstrapData();
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, withTxnOptimized(false));
+    replica.load(replicatedDbName, primaryDbName, withTxnOptimized(false));
+    assertBootstrap(tuple);
+    // For some reason, burning transactions below causes unexpected 
open_txn() callback counts.
+    // I am not sure where they come from, or why it does not affects the 
testTxnEventsOptimized() test.
+//    burnTransactionsOnReplica();
+    PrimaryEventListenerTestImpl.reset();
+    ReplicaEventListenerTestImpl.reset();
+    prepareIncrementalData();
+    tuple = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, withTxnOptimized(false));
+    replica.load(replicatedDbName, primaryDbName, withTxnOptimized(false));
+    EventCount filtered = new EventCount(0, 0, 0);
+    assertTxnOptimization(false, tuple, filtered);
+  }
+  @Test
+  public void testTxnEventsOptimized() throws Throwable {
+    prepareBootstrapData();
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, withTxnOptimized(false));
+    replica.load(replicatedDbName, primaryDbName, withTxnOptimized(false));
+    assertBootstrap(tuple);
+    burnTransactionsOnReplica();
+    PrimaryEventListenerTestImpl.reset();
+    ReplicaEventListenerTestImpl.reset();
+    prepareIncrementalData();
+    tuple = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, withTxnOptimized(true));
+    replica.load(replicatedDbName, primaryDbName, withTxnOptimized(true));
+    EventCount filtered = new EventCount(4, 3, 1);
+    assertTxnOptimization(true, tuple, filtered);
+  }
+  private void assertBootstrap(WarehouseInstance.Tuple tuple) throws 
IOException {
+    List<Path> openTxns = new ArrayList<Path>();
+    List<Path> commitTxns = new ArrayList<Path>();
+    List<Path> abortTxns = new ArrayList<Path>();
+    ReplicationTestUtils.findTxnsFromDump(tuple, primary.hiveConf, openTxns, 
commitTxns, abortTxns);
+    Assert.assertEquals(openTxns.size(), 0);
+    Assert.assertEquals(commitTxns.size(), 0);
+    Assert.assertEquals(abortTxns.size(), 0);
+  }
+  private void assertTxnOptimization(boolean optimizationOn, 
WarehouseInstance.Tuple tuple, EventCount filtered) throws Exception {
+    List<Path> openTxns = new ArrayList<Path>();
+    List<Path> commitTxns = new ArrayList<Path>();
+    List<Path> abortTxns = new ArrayList<Path>();
+    ReplicationTestUtils.findTxnsFromDump(tuple, primary.hiveConf, openTxns, 
commitTxns, abortTxns);
+    Assert.assertEquals(expected.getCountOpenTxn(), 
+    Assert.assertEquals(expected.getCountCommitTxn(), 
+    Assert.assertEquals(expected.getCountAbortTxn(), 
+    Assert.assertEquals(expected.getCountOpenTxn() - 
filtered.getCountOpenTxn(), ReplicaEventListenerTestImpl.getCountOpenTxn());
+    Assert.assertEquals(expected.getCountCommitTxn() - 
filtered.getCountCommitTxn(), ReplicaEventListenerTestImpl.getCountCommitTxn());
+    Assert.assertEquals(expected.getCountAbortTxn() - 
filtered.getCountAbortTxn(), ReplicaEventListenerTestImpl.getCountAbortTxn());
+    Assert.assertEquals(optimizationOn ? 0 : expected.getCountOpenTxn(), 
+    Assert.assertEquals(expected.getCountCommitTxn() - 
filtered.getCountCommitTxn(), commitTxns.size());
+    Assert.assertEquals(expected.getCountAbortTxn() - 
filtered.getCountAbortTxn(), abortTxns.size());
+//    Map<Long, Long> replicaTxnMapping = 

Review Comment:
   And this

Issue Time Tracking

    Worklog Id:     (was: 784279)
    Time Spent: 2h 10m  (was: 2h)

> REPL DUMP should filter out OpenXacts and unneeded CommitXact/Abort.
> --------------------------------------------------------------------
>                 Key: HIVE-26265
>                 URL: https://issues.apache.org/jira/browse/HIVE-26265
>             Project: Hive
>          Issue Type: Improvement
>          Components: HiveServer2
>            Reporter: francis pang
>            Assignee: francis pang
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
> REPL DUMP is replication all OpenXacts, even when they are from other non 
> replicated databases. This wastes space in the dump, and ends up opening 
> unneeded transactions during REPL LOAD.
> Add a config property for replication that filters out OpenXact events during 
> REPL DUMP. During REPL LOAD, the txns can be implicitly opened when the 
> ALLOC_WRITE_ID is processed. For CommitTxn and AbortTxn, dump only if WRITE 
> ID was allocated.

This message was sent by Atlassian Jira

Reply via email to