[ 
https://issues.apache.org/jira/browse/HIVE-23069?focusedWorklogId=459148&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-459148
 ]

ASF GitHub Bot logged work on HIVE-23069:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jul/20 08:53
            Start Date: 15/Jul/20 08:53
    Worklog Time Spent: 10m 
      Work Description: aasha commented on a change in pull request #1225:
URL: https://github.com/apache/hive/pull/1225#discussion_r454888843



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws 
IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write 
operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {

Review comment:
       why do you need noMoreElement and nextElement

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -207,4 +217,20 @@ public ReplicationMetricCollector getMetricCollector() {
   public void setMetricCollector(ReplicationMetricCollector metricCollector) {
     this.metricCollector = metricCollector;
   }
+
+  public ReplicationSpec getReplicationSpec() {
+    return replicationSpec;
+  }
+
+  public void setReplicationSpec(ReplicationSpec replicationSpec) {
+    this.replicationSpec = replicationSpec;
+  }
+
+  public FileList getFileList(Path backingFile, int cacheSize, HiveConf conf, 
boolean b) throws IOException {

Review comment:
       If this is used for tests, you can just mock the ReplDumpWork and return 
expected values.

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -210,6 +210,66 @@ public void externalTableReplicationWithDefaultPaths() 
throws Throwable {
     assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), 
tuple.dumpLocation, true);
   }
 
+  @Test
+  public void externalTableReplicationWithDefaultPathsLazyCopy() throws 
Throwable {
+    List<String> lazyCopyClause = Arrays.asList("'" + 
HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+    //creates external tables with partitions
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("insert into table t1 values (2)")
+            .run("create external table t2 (place string) partitioned by 
(country string)")
+            .run("insert into table t2 partition(country='india') values 
('bangalore')")
+            .run("insert into table t2 partition(country='us') values 
('austin')")
+            .run("insert into table t2 partition(country='france') values 
('paris')")
+            .dump(primaryDbName, lazyCopyClause);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, 
primaryDbName, false);
+
+
+
+    replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select country from t2 where country = 'us'")
+            .verifyResult("us")
+            .run("select country from t2 where country = 'france'")
+            .verifyResult("france")
+            .run("show partitions t2").verifyResults(new String[] 
{"country=france", "country=india", "country=us"});
+
+    String hiveDumpLocation = tuple.dumpLocation + File.separator + 
ReplUtils.REPL_HIVE_BASE_DIR;
+    // Ckpt should be set on bootstrapped db.
+    replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation);
+
+    assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + 
".t1");
+    assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + 
".t2");
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("create external table t3 (id int)")
+            .run("insert into table t3 values (10)")
+            .run("create external table t4 as select id from t3")

Review comment:
       test for external table with a different partition location

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws 
IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write 
operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);

Review comment:
       if cache size is 0, this call is not needed

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileListStreamer.class);
+  private static BufferedWriter backingFileWriterInTest;
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean signalTostop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+  private volatile boolean initialized = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, 
Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+  }
+
+  private void lazyInit() throws IOException {
+    if (backingFileWriterInTest == null) {

Review comment:
       mock this method instead of changing the code specific to test

##########
File path: 
ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerFactory.class})
+public class TestFileList {
+
+  @Mock
+  private HiveConf hiveConf;
+
+  @Mock
+  BufferedWriter bufferedWriter;
+
+  @Test
+  public void testNoStreaming() throws Exception {
+    
Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    assertFalse(fileList.isStreamingToFile());
+  }
+
+  @Test
+  public void testAlwaysStreaming() throws Exception {
+    
Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(true);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry1");
+    waitForStreamingInitialization(fileList);
+    assertTrue(fileList.isStreamingToFile());
+    fileList.close();
+    waitForStreamingClosure(fileList);
+  }
+
+  @Test
+  public void testStreaminOnCacheHit() throws Exception {
+    
Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 5, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    fileList.add("Entry3");
+    Thread.sleep(5000L);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry4");
+    fileList.add("Entry5");
+    waitForStreamingInitialization(fileList);
+    fileList.close();
+    waitForStreamingClosure(fileList);

Review comment:
       check if data is written to file

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileListStreamer.class);
+  private static BufferedWriter backingFileWriterInTest;
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean signalTostop;
+  private final LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+  private volatile boolean initialized = false;
+
+
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, 
Configuration conf) throws IOException {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+  }
+
+  private void lazyInit() throws IOException {
+    if (backingFileWriterInTest == null) {
+      FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+      backingFileWriter = new BufferedWriter(new 
OutputStreamWriter(fs.create(backingFile)));
+    } else {
+      backingFileWriter = backingFileWriterInTest;
+    }
+    initialized = true;
+    LOG.info("Initialized a file based store to save a list at: {}", 
backingFile);
+  }
+
+  public boolean isValid() {
+    return valid;
+  }
+
+  // Blocks for remaining entries to be flushed to file.
+  @Override
+  public void close() throws IOException {
+    signalTostop = true;
+    synchronized (COMPLETION_LOCK) {
+      while (motiveToWait()) {
+        try {
+          COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS));
+        } catch (InterruptedException e) {
+          // no-op
+        }
+      }
+    }
+    if (!isValid()) {
+      throw new IOException("File list is not in a valid state:" + 
backingFile);
+    }
+  }
+
+  private boolean motiveToWait() {
+    return !completed && valid;
+  }
+
+  @Override
+  public void run() {
+    try {
+      lazyInit();
+    } catch (IOException e) {
+      valid = false;
+      throw new RuntimeException("Unable to initialize the file list 
streamer", e);
+    }
+    boolean exThrown = false;
+    while (!exThrown && (!signalTostop || !cache.isEmpty())) {
+      try {
+        String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+        if (nextEntry != null) {
+          backingFileWriter.write(nextEntry);
+          backingFileWriter.newLine();
+          LOG.debug("Writing entry {} to file list backed by {}", nextEntry, 
backingFile);
+        }
+      } catch (Exception iEx) {
+        if (!(iEx instanceof InterruptedException)) {
+          // not draining any more. Inform the producer to avoid OOM.
+          valid = false;
+          LOG.error("Exception while saving the list to file " + backingFile, 
iEx);
+          exThrown = true;
+        }
+      }
+    }
+    try{
+      closeBackingFile();
+      completed = true;
+    } finally {
+      synchronized (COMPLETION_LOCK) {
+        COMPLETION_LOCK.notify();
+      }
+    }
+    LOG.info("Completed the file list streamer backed by: {}", backingFile);
+  }
+
+  private void closeBackingFile() {
+    try {
+      backingFileWriter.close();
+      LOG.debug("Closed the file list backing file: {}", backingFile);
+    } catch (IOException e) {
+      LOG.error("Exception while closing the file list backing file", e);
+      valid = false;
+    }
+  }
+
+  @VisibleForTesting
+  public static void setBackingFileWriterInTest(BufferedWriter bufferedWriter) 
{

Review comment:
       mock this instead of having test specific code

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -210,6 +210,66 @@ public void externalTableReplicationWithDefaultPaths() 
throws Throwable {
     assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), 
tuple.dumpLocation, true);
   }
 
+  @Test
+  public void externalTableReplicationWithDefaultPathsLazyCopy() throws 
Throwable {
+    List<String> lazyCopyClause = Arrays.asList("'" + 
HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+    //creates external tables with partitions
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("insert into table t1 values (2)")
+            .run("create external table t2 (place string) partitioned by 
(country string)")
+            .run("insert into table t2 partition(country='india') values 
('bangalore')")
+            .run("insert into table t2 partition(country='us') values 
('austin')")
+            .run("insert into table t2 partition(country='france') values 
('paris')")
+            .dump(primaryDbName, lazyCopyClause);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, 
primaryDbName, false);
+
+
+
+    replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select country from t2 where country = 'us'")
+            .verifyResult("us")
+            .run("select country from t2 where country = 'france'")
+            .verifyResult("france")
+            .run("show partitions t2").verifyResults(new String[] 
{"country=france", "country=india", "country=us"});

Review comment:
       verify data in partitions

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
##########
@@ -165,4 +178,83 @@ private void validateSrcPathListExists() throws 
IOException, LoginException {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported 
to.
+   * The data export here is a list of files either in table/partition that 
are written to the _files
+   * in the exportRootDataDir provided.
+   */
+   void exportFilesAsList() throws SemanticException, IOException, 
LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    Retry<Void> retryable = new Retry<Void>(IOException.class) {
+      @Override
+      public Void execute() throws Exception {
+        try (BufferedWriter writer = writer()) {
+          for (Path dataPath : dataPathList) {
+            writeFilesList(listFilesInDir(dataPath), writer, 
AcidUtils.getAcidSubDir(dataPath));
+          }
+        } catch (IOException e) {
+          if (e instanceof FileNotFoundException) {

Review comment:
       It will retry for FileNotFoundException also

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
##########
@@ -1559,6 +1645,76 @@ public void testIncrementalLoad() throws IOException {
     verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, 
driverMirror);
   }
 
+  @Test
+  public void testIncrementalLoadLazyCopy() throws IOException {

Review comment:
       Is this test in different mini hdfs set up?

##########
File path: 
ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerFactory.class})
+public class TestFileList {
+
+  @Mock
+  private HiveConf hiveConf;
+
+  @Mock
+  BufferedWriter bufferedWriter;
+
+  @Test
+  public void testNoStreaming() throws Exception {
+    
Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    assertFalse(fileList.isStreamingToFile());
+  }
+
+  @Test
+  public void testAlwaysStreaming() throws Exception {
+    
Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(true);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry1");
+    waitForStreamingInitialization(fileList);
+    assertTrue(fileList.isStreamingToFile());
+    fileList.close();
+    waitForStreamingClosure(fileList);
+  }
+
+  @Test
+  public void testStreaminOnCacheHit() throws Exception {
+    
Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 5, hiveConf);
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    fileList.add("Entry3");
+    Thread.sleep(5000L);
+    assertFalse(fileList.isStreamingInitialized());
+    fileList.add("Entry4");
+    fileList.add("Entry5");
+    waitForStreamingInitialization(fileList);
+    fileList.close();
+    waitForStreamingClosure(fileList);
+  }
+
+  @Test
+  public void testConcurrentAdd() throws Exception {
+    
Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false);
+    FileListStreamer.setBackingFileWriterInTest(bufferedWriter);
+    Path backingFile  = new Path("/tmp/backingFile");
+    FileList fileList = new FileList(backingFile, 100, hiveConf);
+    int numOfEntries = 1000;
+    int numOfThreads = 10;
+    ExecutorService executorService = 
Executors.newFixedThreadPool(numOfThreads);
+
+    for (int i=1; i<=numOfEntries; i++) {
+      executorService.submit(() -> {
+        try {
+          fileList.add("someEntry");
+        } catch (SemanticException e) {
+          throw new RuntimeException("Unbale to add to file list.");
+        }
+      });
+    }
+    executorService.awaitTermination(1, TimeUnit.MINUTES);
+    waitForStreamingInitialization(fileList);
+    fileList.close();
+    waitForStreamingClosure(fileList);
+    ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
+    Mockito.verify(bufferedWriter, 
Mockito.times(numOfEntries)).write(entryArgs.capture());

Review comment:
       check for data in file

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) throws 
IOException {
+    this.backingFile = backingFile;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write 
operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
+    } else {
+      thresholdHit = true;
+    }
+    this.conf = conf;
+    thresholdPoint = getThreshold(cacheSize);
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {
+      return false;
+    }
+    nextElement = readNextLine();
+    if (nextElement == null) {
+      noMoreElement = true;
+    }
+    return !noMoreElement;
+  }
+
+  @Override
+  public String next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more element in the list backed by 
" + backingFile);
+    }
+    String retVal = nextElement;
+    nextElement = null;
+    return thresholdHit ? retVal : cache.poll();
+  }
+
+  private synchronized void initStoreToFile(int cacheSize) {
+    if (!thresholdHit) {
+      fileListStreamer.setName(getNextID());
+      fileListStreamer.setDaemon(true);
+      fileListStreamer.start();
+      thresholdHit = true;
+      LOG.info("Started streaming the list elements to file: {}, cache size 
{}", backingFile, cacheSize);
+    }
+  }
+
+  private String readNextLine() {
+    String nextElement = null;
+    try {
+      if (backingFileReader == null) {
+        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+        if (fs.exists(backingFile)) {
+          backingFileReader = new BufferedReader(new 
InputStreamReader(fs.open(backingFile)));
+        }
+      }
+      nextElement = (backingFileReader == null) ? null : 
backingFileReader.readLine();
+    } catch (IOException e) {
+      LOG.error("Unable to read list from backing file " + backingFile, e);
+    }
+    return nextElement;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (thresholdHit && fileListStreamer != null) {
+      fileListStreamer.close();
+    }
+    if (backingFileReader != null) {
+      backingFileReader.close();
+    }
+    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", 
backingFile, thresholdHit);
+  }
+
+  private static String getNextID() {
+    if (Integer.MAX_VALUE == fileListStreamerID) {
+      //reset the counter
+      fileListStreamerID = 0;
+    }
+    fileListStreamerID++;
+    return FILE_LIST_STREAMER_PREFIX  + fileListStreamerID;
+  }
+
+  public int getThreshold(int cacheSize) {
+    boolean copyAtLoad = 
conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor);
+  }
+
+  @VisibleForTesting
+  public boolean isStreamingToFile() {

Review comment:
       for unit tests you can make them package public




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 459148)
    Time Spent: 3h 10m  (was: 3h)

> Memory efficient iterator should be used during replication.
> ------------------------------------------------------------
>
>                 Key: HIVE-23069
>                 URL: https://issues.apache.org/jira/browse/HIVE-23069
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Pravin Sinha
>            Assignee: Pravin Sinha
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-23069.01.patch, HIVE-23069.02.patch
>
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Currently the iterator used while copying table data is memory based. In case 
> of a database with very large number of table/partitions, such iterator may 
> cause HS2 process to go OOM.
> Also introduces a config option to run data copy tasks during repl load 
> operation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to