[ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=563490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-563490
 ]

ASF GitHub Bot logged work on HIVE-24718:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Mar/21 04:33
            Start Date: 10/Mar/21 04:33
    Worklog Time Spent: 10m 
      Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r591024045



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator<String> {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue<String> cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
     this.backingFile = backingFile;
     this.conf = conf;
-    if (cacheSize > 0) {
-      // Cache size must be > 0 for this list to be used for the write 
operation.
-      this.cache = new LinkedBlockingQueue<>(cacheSize);
-      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-      thresholdPoint = getThreshold(cacheSize);
-      LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
+    this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+      writeWithRetry(entry);
     } else {
-      thresholdHit = true;
+      writeEntry(entry);
     }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue<String> cache, HiveConf conf) {
-    this.backingFile = backingFile;
-    this.fileListStreamer = fileListStreamer;
-    this.cache = cache;
-    this.conf = conf;
-    thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+    //retry only during creating the file, no retry during writes
+    if (backingFileWriter == null) {
+      try {
+        Retryable retryable = buildRetryable();
+        retryable.executeCallable((Callable<Void>) () -> {
+          if(this.abortOperation) {
+            return null;
+          }
+          backingFileWriter = getWriterCreateMode();
+          return null;
+        });
+      } catch (Exception e) {
+        this.abortOperation = true;
+        throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+      }
+    }
+    if(this.abortOperation) {
+      return;
+    }
+    try {
+      backingFileWriter.writeBytes(getEntryWithNewline(entry));
+      LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+    } catch (IOException e) {
+      this.abortOperation = true;
+      LOG.error("Writing entry {} to file list {} failed.", entry, 
backingFile, e);
+      throw e;
+    }
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-    if (thresholdHit && !fileListStreamer.isAlive()) {
-      throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  private synchronized void writeWithRetry(String entry) throws IOException {
+    Retryable retryable = buildRetryable();
+    try {
+      retryable.executeCallable((Callable<Void>) () -> {
+        if (this.abortOperation) {
+          return null;

Review comment:
       Done.

##########
File path: 
ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##########
@@ -18,147 +18,266 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
 /**
  * Tests the File List implementation.
  */
 
-@RunWith(PowerMockRunner.class)
+@RunWith(MockitoJUnitRunner.class)
 @PrepareForTest({LoggerFactory.class})
 public class TestFileList {
 
-  @Mock
-  private BufferedWriter bufferedWriter;
-
-
-  @Test
-  public void testNoStreaming() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    fileList.add("Entry1");
-    fileList.add("Entry2");
-    assertFalse(isStreamingToFile(fileListStreamer));
-  }
+  HiveConf conf = new HiveConf();
+  private FSDataOutputStream outStream;
+  private FSDataOutputStream testFileStream;
+  final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+          File.separator + TestFileList.class.getCanonicalName() + "-" + 
System.currentTimeMillis()
+  ).getPath().replaceAll("\\\\", "/");
+  private Exception testException = new IOException("test");
 
   @Test
-  public void testAlwaysStreaming() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, true);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    assertFalse(fileListStreamer.isInitialized());
-    fileList.add("Entry1");
-    waitForStreamingInitialization(fileListStreamer);
-    assertTrue(isStreamingToFile(fileListStreamer));
-    fileList.close();
-    waitForStreamingClosure(fileListStreamer);
-  }
+  public void testConcurrentAdd() throws Exception {
+    FileList fileList = setupFileList();
+    int numOfEntries = 1000;
+    int numOfThreads = 10;
+    ExecutorService executorService = 
Executors.newFixedThreadPool(numOfThreads);
 
-  @Test
-  public void testStreaminOnCacheHit() throws Exception {
-    Object tuple[] =  setupAndGetTuple(5, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-    fileList.add("Entry1");
-    fileList.add("Entry2");
-    fileList.add("Entry3");
-    Thread.sleep(5000L);
-    assertFalse(fileListStreamer.isInitialized());
-    fileList.add("Entry4");
-    fileList.add("Entry5");
-    waitForStreamingInitialization(fileListStreamer);
+    for (int i=1; i<=numOfEntries; i++) {
+      executorService.submit(() -> {
+        try {
+          fileList.add("someEntry");
+        } catch (IOException e) {
+          throw new RuntimeException("Unbale to add to file list.");
+        }
+      });
+    }
+    executorService.awaitTermination(1, TimeUnit.MINUTES);
     fileList.close();
-    waitForStreamingClosure(fileListStreamer);
+    ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
+    Mockito.verify(testFileStream, 
Mockito.times(numOfEntries)).writeBytes(entryArgs.capture());
   }
 
   @Test
-  public void testConcurrentAdd() throws Exception {
-    Object tuple[] =  setupAndGetTuple(100, false);
-    FileList fileList = (FileList) tuple[0];
-    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
+  public void testConcurrentAddWithAbort() throws Exception {
+    FileList fileList = setupFileList(false, false, false);
     int numOfEntries = 1000;
     int numOfThreads = 10;
     ExecutorService executorService = 
Executors.newFixedThreadPool(numOfThreads);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
 
     for (int i=1; i<=numOfEntries; i++) {
       executorService.submit(() -> {
         try {
           fileList.add("someEntry");
-        } catch (SemanticException e) {
-          throw new RuntimeException("Unbale to add to file list.");
+        } catch (IOException e) {
+          Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg));
         }
       });
     }
     executorService.awaitTermination(1, TimeUnit.MINUTES);
-    waitForStreamingInitialization(fileListStreamer);
     fileList.close();
-    waitForStreamingClosure(fileListStreamer);
     ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
-    Mockito.verify(bufferedWriter, 
Mockito.times(numOfEntries)).write(entryArgs.capture());
+    //retry exhausted should be encountered by the first thread, so the other 
threads do not write.
+    Mockito.verify(outStream, 
Mockito.times(1)).writeBytes(entryArgs.capture());
   }
 
-  private void waitForStreamingInitialization(FileListStreamer 
fileListStreamer) throws InterruptedException {
-    long sleepTime = 1000L;
-    int iter = 0;
-    while (!fileListStreamer.isInitialized()) {
-      Thread.sleep(sleepTime);
-      iter++;
-      if (iter == 5) {
-        throw new IllegalStateException("File Streamer not initialized till 
5s.");
-      }
+  @Test
+  public void testWriteRetryCreateFailure() throws Exception {
+    String testEntry = "someEntry";
+    boolean retryOnCreate = true;
+    FileList fileList = setupFileList(retryOnCreate);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
+
+    try {
+      fileList.add(testEntry);
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg));
+    }
+
+    //the create keeps failing, so create should be called at least twice,
+    //writes and appends do not happen
+    Mockito.verify(fileList, Mockito.atLeast(2)).getWriterCreateMode();
+    Mockito.verify(fileList, Mockito.times(0)).getWriterAppendMode();
+  }
+
+  @Test
+  public void testWriteNoRetry() throws Exception {
+    String testEntry = "someEntry";
+    boolean retryOnCreate = false, retryOnWrite = false;
+    FileList fileList = setupFileList(retryOnCreate, retryOnWrite);
+    final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED
+            .format(testException.getMessage());
+
+    try {
+      fileList.add(testEntry);
+    } catch (IOException e) {
+      Assert.assertFalse(e.getMessage().contains(retryExhaustedMsg));
+      Assert.assertTrue(e.getMessage().contains("test"));
     }
+
+    //the first write fails and no retries are made
+    Mockito.verify(fileList, Mockito.times(1)).getWriterCreateMode();
+    Mockito.verify(outStream, 
Mockito.times(1)).writeBytes(Mockito.anyString());
+    Mockito.verify(fileList, Mockito.times(0)).getWriterAppendMode();
   }
 
-  private void waitForStreamingClosure(FileListStreamer fileListStreamer) 
throws InterruptedException {
-    long sleepTime = 1000L;
-    int iter = 0;
-    while (!isStreamingClosedProperly(fileListStreamer)) {
-      Thread.sleep(sleepTime);
-      iter++;
-      if (iter == 5) {
-        throw new IllegalStateException("File Streamer not getting closed till 
5s.");
+  @Test
+  public void testReadWithDuplicateEntries() throws Exception {
+    conf = new HiveConf();
+    String testEntry = "someEntry";
+    int numUniqueEntries = 100;
+    Path testFilePath =  new Path(new Path(TEST_DATA_DIR), "testFile");
+    FileList fileList = new FileList(testFilePath, conf);
+
+    for (int i = 1; i <= numUniqueEntries; i++) {
+      String currentUniqueEntry = testEntry + Integer.valueOf(i);

Review comment:
       Made this change.

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1837,64 @@ public void testHdfsNameserviceWithDataCopy() throws 
Throwable {
             .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+    List<String> clause = new ArrayList<>();
+    //NS replacement parameters has no effect when data is also copied to 
staging
+    clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + 
"'='false'");
+    clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, clause);
+    assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
       Done.

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -653,6 +649,8 @@ private static void populateLlapDaemonVarsSet(Set<String> 
llapDaemonVarsSetLocal
       new TimeValidator(TimeUnit.HOURS),
       "Total allowed retry duration in hours inclusive of all retries. Once 
this is exhausted, " +
         "the policy instance will be marked as failed and will need manual 
intervention to restart."),
+    REPL_COPY_ITERATOR_RETRY("hive.repl.copy.iterator.retry", true,

Review comment:
       Added description and renamed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 563490)
    Time Spent: 5h 50m  (was: 5h 40m)

> Moving to file based iteration for copying data
> -----------------------------------------------
>
>                 Key: HIVE-24718
>                 URL: https://issues.apache.org/jira/browse/HIVE-24718
>             Project: Hive
>          Issue Type: Bug
>            Reporter: Arko Sharma
>            Assignee: Arko Sharma
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch
>
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to