[ https://issues.apache.org/jira/browse/HIVE-26265?focusedWorklogId=784737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-784737 ]
ASF GitHub Bot logged work on HIVE-26265: ----------------------------------------- Author: ASF GitHub Bot Created on: 24/Jun/22 22:47 Start Date: 24/Jun/22 22:47 Worklog Time Spent: 10m Work Description: cmunkey commented on code in PR #3365: URL: https://github.com/apache/hive/pull/3365#discussion_r906460588 ########## itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationFilterTransactions.java: ########## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.events.*; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; +import org.apache.hadoop.hive.ql.processors.CommandProcessorException; +import org.apache.hadoop.hive.shims.Utils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION; + +/** + * TestTxnReplicationOptimizations - Test transaction replication. + */ +public class TestReplicationFilterTransactions { + static final private Logger LOG = LoggerFactory.getLogger(TestReplicationFilterTransactions.class); + + private final static String tid = + TestReplicationFilterTransactions.class.getCanonicalName().toLowerCase().replace('.','_') + "_" + System.currentTimeMillis(); + private final static String TEST_PATH = + System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid; + + @Rule + public TemporaryFolder tempFolder= new TemporaryFolder(); + + static public class PrimaryEventListenerTestImpl extends MetaStoreEventListener { + public PrimaryEventListenerTestImpl(Configuration conf) { + super(conf); + } + + private static final AtomicInteger countOpenTxn = new AtomicInteger(0); + private static final AtomicInteger countCommitTxn = new AtomicInteger (0); + private static final AtomicInteger countAbortTxn = new AtomicInteger (0); + + @Override + public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator); + countOpenTxn.getAndIncrement(); + } + + @Override + public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + super.onCommitTxn(commitTxnEvent, dbConn, sqlGenerator); + countCommitTxn.getAndIncrement(); + } + + @Override + public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + super.onAbortTxn(abortTxnEvent, dbConn, sqlGenerator); + countAbortTxn.getAndIncrement(); + } + + public static int getCountOpenTxn() { + return countOpenTxn.get(); + } + + public static int getCountCommitTxn() { + return countCommitTxn.get(); + } + + public static int getCountAbortTxn() { + return countAbortTxn.get(); + } + + public static void reset() { + countOpenTxn.set(0); + countCommitTxn.set(0); + countAbortTxn.set(0); + } + } + + static public class ReplicaEventListenerTestImpl extends MetaStoreEventListener { + public ReplicaEventListenerTestImpl(Configuration conf) { + super(conf); + } + + private static final AtomicInteger countOpenTxn = new AtomicInteger(0); + private static final AtomicInteger countCommitTxn = new AtomicInteger (0); + private static final AtomicInteger countAbortTxn = new AtomicInteger (0); + + private static final Map<Long, Long> txnMapping = new ConcurrentHashMap<Long, Long>(); + + @Override + public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator); + countOpenTxn.getAndIncrement(); + // Following code tries to read/save REPL_TXN_MAP, so we can check later test that target to source TxnId + // mapping was done. + // But the select below seems cause the following lock error + // org.apache.hadoop.hive.ql.lockmgr.LockException: Error communicating with the metastore +// try { Review Comment: If fixed this by doing a dirty read. Issue Time Tracking ------------------- Worklog Id: (was: 784737) Time Spent: 2h 40m (was: 2.5h) > REPL DUMP should filter out OpenXacts and unneeded CommitXact/Abort. > -------------------------------------------------------------------- > > Key: HIVE-26265 > URL: https://issues.apache.org/jira/browse/HIVE-26265 > Project: Hive > Issue Type: Improvement > Components: HiveServer2 > Reporter: francis pang > Assignee: francis pang > Priority: Major > Labels: pull-request-available > Time Spent: 2h 40m > Remaining Estimate: 0h > > REPL DUMP is replication all OpenXacts, even when they are from other non > replicated databases. This wastes space in the dump, and ends up opening > unneeded transactions during REPL LOAD. > > Add a config property for replication that filters out OpenXact events during > REPL DUMP. During REPL LOAD, the txns can be implicitly opened when the > ALLOC_WRITE_ID is processed. For CommitTxn and AbortTxn, dump only if WRITE > ID was allocated. -- This message was sent by Atlassian Jira (v8.20.7#820007)