[ 
https://issues.apache.org/jira/browse/HIVE-26265?focusedWorklogId=784737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-784737
 ]

ASF GitHub Bot logged work on HIVE-26265:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Jun/22 22:47
            Start Date: 24/Jun/22 22:47
    Worklog Time Spent: 10m 
      Work Description: cmunkey commented on code in PR #3365:
URL: https://github.com/apache/hive/pull/3365#discussion_r906460588


##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationFilterTransactions.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.*;
+import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
+
+/**
+ * TestTxnReplicationOptimizations - Test transaction replication.
+ */
+public class TestReplicationFilterTransactions {
+  static final private Logger LOG = 
LoggerFactory.getLogger(TestReplicationFilterTransactions.class);
+
+  private final static String tid =
+          
TestReplicationFilterTransactions.class.getCanonicalName().toLowerCase().replace('.','_')
 + "_" + System.currentTimeMillis();
+  private final static String TEST_PATH =
+          System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + 
tid;
+
+  @Rule
+  public TemporaryFolder tempFolder= new TemporaryFolder();
+
+  static public class PrimaryEventListenerTestImpl extends 
MetaStoreEventListener {
+    public PrimaryEventListenerTestImpl(Configuration conf) {
+      super(conf);
+    }
+
+    private static final AtomicInteger countOpenTxn = new AtomicInteger(0);
+    private static final AtomicInteger countCommitTxn = new AtomicInteger (0);
+    private static final AtomicInteger countAbortTxn = new AtomicInteger (0);
+
+    @Override
+    public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator);
+      countOpenTxn.getAndIncrement();
+    }
+
+    @Override
+    public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onCommitTxn(commitTxnEvent, dbConn, sqlGenerator);
+      countCommitTxn.getAndIncrement();
+    }
+
+    @Override
+    public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onAbortTxn(abortTxnEvent, dbConn, sqlGenerator);
+      countAbortTxn.getAndIncrement();
+    }
+
+    public static int getCountOpenTxn() {
+      return countOpenTxn.get();
+    }
+
+    public static int getCountCommitTxn() {
+      return countCommitTxn.get();
+    }
+
+    public static int getCountAbortTxn() {
+      return countAbortTxn.get();
+    }
+
+    public static void reset() {
+      countOpenTxn.set(0);
+      countCommitTxn.set(0);
+      countAbortTxn.set(0);
+    }
+  }
+
+  static public class ReplicaEventListenerTestImpl extends 
MetaStoreEventListener {
+    public ReplicaEventListenerTestImpl(Configuration conf) {
+      super(conf);
+    }
+
+    private static final AtomicInteger countOpenTxn = new AtomicInteger(0);
+    private static final AtomicInteger countCommitTxn = new AtomicInteger (0);
+    private static final AtomicInteger countAbortTxn = new AtomicInteger (0);
+
+    private static final Map<Long, Long> txnMapping = new 
ConcurrentHashMap<Long, Long>();
+    
+    @Override
+    public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, 
SQLGenerator sqlGenerator) throws MetaException {
+      super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator);
+      countOpenTxn.getAndIncrement();
+      // Following code tries to read/save REPL_TXN_MAP, so we can check later 
test that target to source TxnId
+      // mapping was done.
+      // But the select below seems cause the following lock error
+      //     org.apache.hadoop.hive.ql.lockmgr.LockException: Error 
communicating with the metastore
+//      try {

Review Comment:
   If fixed this by doing a dirty read.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 784737)
    Time Spent: 2h 40m  (was: 2.5h)

> REPL DUMP should filter out OpenXacts and unneeded CommitXact/Abort.
> --------------------------------------------------------------------
>
>                 Key: HIVE-26265
>                 URL: https://issues.apache.org/jira/browse/HIVE-26265
>             Project: Hive
>          Issue Type: Improvement
>          Components: HiveServer2
>            Reporter: francis pang
>            Assignee: francis pang
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> REPL DUMP is replication all OpenXacts, even when they are from other non 
> replicated databases. This wastes space in the dump, and ends up opening 
> unneeded transactions during REPL LOAD.
>  
> Add a config property for replication that filters out OpenXact events during 
> REPL DUMP. During REPL LOAD, the txns can be implicitly opened when the 
> ALLOC_WRITE_ID is processed. For CommitTxn and AbortTxn, dump only if WRITE 
> ID was allocated.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to