[ https://issues.apache.org/jira/browse/HIVE-23069?focusedWorklogId=457108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-457108 ]
ASF GitHub Bot logged work on HIVE-23069: ----------------------------------------- Author: ASF GitHub Bot Created on: 10/Jul/20 12:12 Start Date: 10/Jul/20 12:12 Worklog Time Spent: 10m Work Description: aasha commented on a change in pull request #1225: URL: https://github.com/apache/hive/pull/1225#discussion_r452798321 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ########## @@ -591,14 +590,25 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); - extTableCopyWorks = dirLocationsToCopy(extTableLocations); } - work.setDirCopyIterator(extTableCopyWorks.iterator()); - work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); + setDataCopyIterators(extTableFileList, managedTblList); work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); return lastReplId; } + private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) throws IOException { + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + extTableFileList.close(); Review comment: Is this serving the purpose of flush? Its not clear why close is called before setting the iterator. Needs to be simplified. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * A file backed list of Strings which is in-memory till the threshold. + */ +public class FileList implements Closeable, Iterator<String> { Review comment: Add UTs ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class FileListStreamer extends Thread implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class); + private static final long TIMEOUT_IN_SECS = 5L; + private volatile boolean stop; + private final LinkedBlockingQueue<String> cache; + private Path backingFile; + private Configuration conf; + private BufferedWriter backingFileWriter; + private volatile boolean valid = true; + private volatile boolean asyncMode = false; + private final Object COMPLETION_LOCK = new Object(); + private volatile boolean completed = false; + + + + public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException { + this.cache = cache; + this.backingFile = backingFile; + this.conf = conf; + init(); + } + + private void init() throws IOException { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode))); + LOG.info("Initialized a file based store to save a list at: {}, ayncMode:{}", backingFile, asyncMode); + } + + public boolean isValid() { + return valid; + } + + @Override + public void close() throws IOException { + if (!asyncMode) { + closeBackingFile(); + return; + } + stop = true; + synchronized (COMPLETION_LOCK) { + while (!completed && isValid()) { + try { + COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS)); + } catch (InterruptedException e) { + } + } + } + if (!isValid()) { Review comment: When will this be false? Only in closeBackingFile or anywhere else also? If yes then this check can be moved up or let closeBackingFile handle throwing the exception. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class FileListStreamer extends Thread implements Closeable { Review comment: Just implementing Runnable interface is fine. No need to extend Thread ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * A file backed list of Strings which is in-memory till the threshold. + */ +public class FileList implements Closeable, Iterator<String> { + private static final Logger LOG = LoggerFactory.getLogger(FileList.class); + private static int fileListStreamerID = 0; + private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; + + private LinkedBlockingQueue<String> cache; + private volatile boolean thresholdHit = false; + private int thresholdPoint; + private float thresholdFactor = 0.9f; + private Path backingFile; + private FileListStreamer fileListStreamer; + private FileListOpMode fileListOpMode; + private String nextElement; + private boolean noMoreElement; + private HiveConf conf; + private BufferedReader backingFileReader; + private volatile boolean asyncMode; + + + /** + * To be used only for READ mode; + */ + public FileList(Path backingFile, HiveConf conf) { + this.backingFile = backingFile; + thresholdHit = true; + fileListOpMode = FileListOpMode.READ; + this.conf = conf; + } + + /** + * To be used only for WRITE mode; + */ + public FileList(Path backingFile, int cacheSize, HiveConf conf, boolean asyncMode) throws IOException { + this.cache = new LinkedBlockingQueue<>(cacheSize); + this.backingFile = backingFile; + fileListStreamer = new FileListStreamer(cache, backingFile, conf); + fileListOpMode = FileListOpMode.WRITE; + this.conf = conf; + thresholdPoint = getThreshold(cacheSize); + this.asyncMode = asyncMode; + } + + /** + * Only add operation is safe for concurrent operation. + */ + public void add(String entry) throws SemanticException { + validateMode(FileListOpMode.WRITE); + if (!asyncMode) { + fileListStreamer.writeInThread(entry); + return; + } + if (thresholdHit && !fileListStreamer.isValid()) { + throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + } + try { + cache.put(entry); + } catch (InterruptedException e) { + throw new SemanticException(e); + } + if (!thresholdHit && cache.size() > thresholdPoint) { + initStoreToFile(); + } + } + + /** + * Must be called before the list object can be used for read operation. + * @throws IOException + */ + @Override + public void close() throws IOException { + if (fileListOpMode == FileListOpMode.READ) { + if (backingFileReader != null) { + backingFileReader.close(); + } + } else { + fileListOpMode = FileListOpMode.CLOSING; + if (thresholdHit) { + fileListStreamer.close(); + } + fileListOpMode = FileListOpMode.READ; + } + } + + @Override + public boolean hasNext() { + validateMode(FileListOpMode.READ); + if (!thresholdHit) { + return !cache.isEmpty(); + } + if (nextElement != null) { + return true; + } + if (noMoreElement) { + return false; + } + nextElement = readNextLine(); + if (nextElement == null) { + noMoreElement = true; + } + return !noMoreElement; + } + + private String readNextLine() { + String nextElement = null; + try { + if (backingFileReader == null) { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + if (fs.exists(backingFile)) { + backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile))); + } + } + nextElement = (backingFileReader == null) ? null : backingFileReader.readLine(); + } catch (IOException e) { + LOG.error("Unable to read list from backing file " + backingFile, e); + } + return nextElement; + } + + @Override + public String next() { + validateMode(FileListOpMode.READ); + if (!hasNext()) { + throw new NoSuchElementException("No more element in the list backed by " + backingFile); + } + String retVal = nextElement; + nextElement = null; + return thresholdHit ? retVal : cache.poll(); + } + private synchronized void initStoreToFile() { + if (!thresholdHit) { + fileListStreamer.setName(getNextID()); + fileListStreamer.setDaemon(true); + fileListStreamer.start(); + thresholdHit = true; + LOG.info("Started streaming the list elements to file: {}", backingFile); + } + } + + private static String getNextID() { + if (Integer.MAX_VALUE == fileListStreamerID) { + //reset the counter + fileListStreamerID = 0; + } + fileListStreamerID++; + return FILE_LIST_STREAMER_PREFIX + fileListStreamerID; + } + + private void validateMode(FileListOpMode expectedMode) throws IllegalStateException { + if (!fileListOpMode.equals(expectedMode)) { + String logMessage = String.format("Invalid mode for File List, expected:%s, found:%s", + expectedMode, fileListOpMode); + throw new IllegalStateException(logMessage); + } + } + + public int getThreshold(int cacheSize) { + boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor); + } + + private enum FileListOpMode { + READ, + WRITE, + CLOSING Review comment: Rename to CLOSE or make it READING, WRITING for consistency ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * A file backed list of Strings which is in-memory till the threshold. + */ +public class FileList implements Closeable, Iterator<String> { + private static final Logger LOG = LoggerFactory.getLogger(FileList.class); + private static int fileListStreamerID = 0; + private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; + + private LinkedBlockingQueue<String> cache; + private volatile boolean thresholdHit = false; + private int thresholdPoint; + private float thresholdFactor = 0.9f; + private Path backingFile; + private FileListStreamer fileListStreamer; + private FileListOpMode fileListOpMode; + private String nextElement; + private boolean noMoreElement; + private HiveConf conf; + private BufferedReader backingFileReader; + private volatile boolean asyncMode; + + + /** + * To be used only for READ mode; + */ + public FileList(Path backingFile, HiveConf conf) { Review comment: Better to remove this mode and the add and getNext/hasNext interfaces can take care of it. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class FileListStreamer extends Thread implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class); + private static final long TIMEOUT_IN_SECS = 5L; + private volatile boolean stop; + private final LinkedBlockingQueue<String> cache; + private Path backingFile; + private Configuration conf; + private BufferedWriter backingFileWriter; + private volatile boolean valid = true; + private volatile boolean asyncMode = false; + private final Object COMPLETION_LOCK = new Object(); + private volatile boolean completed = false; + + + + public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) throws IOException { + this.cache = cache; + this.backingFile = backingFile; + this.conf = conf; + init(); + } + + private void init() throws IOException { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode))); + LOG.info("Initialized a file based store to save a list at: {}, ayncMode:{}", backingFile, asyncMode); + } + + public boolean isValid() { + return valid; + } + + @Override + public void close() throws IOException { + if (!asyncMode) { + closeBackingFile(); + return; + } + stop = true; + synchronized (COMPLETION_LOCK) { + while (!completed && isValid()) { + try { + COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS)); + } catch (InterruptedException e) { + } + } + } + if (!isValid()) { + throw new IOException("File list is not in a valid state:" + backingFile); + } + LOG.info("Completed close for File List backed by ", backingFile); + } + + public synchronized void writeInThread(String nextEntry) throws SemanticException { + try { + backingFileWriter.write(nextEntry); + backingFileWriter.newLine(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + @Override + public void run() { + asyncMode = true; + boolean exThrown = false; + while (!exThrown && (!stop || !cache.isEmpty())) { + try { + String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS); + if (nextEntry != null) { + backingFileWriter.write(nextEntry); + backingFileWriter.newLine(); + LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile); + } + } catch (Exception iEx) { + if (!(iEx instanceof InterruptedException)) { + // not draining any more. Inform the producer to avoid OOM. + valid = false; + LOG.error("Exception while saving the list to file " + backingFile, iEx); + exThrown = true; + } + } + } + try{ + closeBackingFile(); + completed = true; + } finally { + synchronized (COMPLETION_LOCK) { + COMPLETION_LOCK.notify(); + } + } + LOG.info("Completed the file list streamer backed by: {}", backingFile); + } + + private void closeBackingFile() { + try { + backingFileWriter.close(); + LOG.debug("Closed the file list backing file: {}", backingFile); + } catch (IOException e) { + LOG.error("Exception while closing the file list backing file", e); + valid = false; Review comment: you can directly throw the exception from here instead of maintaining another state in valid boolean ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ########## @@ -591,14 +590,25 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); - extTableCopyWorks = dirLocationsToCopy(extTableLocations); } - work.setDirCopyIterator(extTableCopyWorks.iterator()); - work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); + setDataCopyIterators(extTableFileList, managedTblList); work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); return lastReplId; } + private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) throws IOException { + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + extTableFileList.close(); Review comment: Or better to close it where you are creating the list using a try-with-resources. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * A file backed list of Strings which is in-memory till the threshold. + */ +public class FileList implements Closeable, Iterator<String> { Review comment: Also add concurrency tests ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * A file backed list of Strings which is in-memory till the threshold. + */ +public class FileList implements Closeable, Iterator<String> { + private static final Logger LOG = LoggerFactory.getLogger(FileList.class); + private static int fileListStreamerID = 0; + private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; + + private LinkedBlockingQueue<String> cache; + private volatile boolean thresholdHit = false; + private int thresholdPoint; + private float thresholdFactor = 0.9f; + private Path backingFile; + private FileListStreamer fileListStreamer; + private FileListOpMode fileListOpMode; + private String nextElement; + private boolean noMoreElement; + private HiveConf conf; + private BufferedReader backingFileReader; + private volatile boolean asyncMode; + + + /** + * To be used only for READ mode; + */ + public FileList(Path backingFile, HiveConf conf) { Review comment: How do you mandate that it will be used only in READ mode ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * A file backed list of Strings which is in-memory till the threshold. + */ +public class FileList implements Closeable, Iterator<String> { + private static final Logger LOG = LoggerFactory.getLogger(FileList.class); + private static int fileListStreamerID = 0; + private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; + + private LinkedBlockingQueue<String> cache; + private volatile boolean thresholdHit = false; + private int thresholdPoint; + private float thresholdFactor = 0.9f; + private Path backingFile; + private FileListStreamer fileListStreamer; + private FileListOpMode fileListOpMode; + private String nextElement; + private boolean noMoreElement; + private HiveConf conf; + private BufferedReader backingFileReader; + private volatile boolean asyncMode; + + + /** + * To be used only for READ mode; + */ + public FileList(Path backingFile, HiveConf conf) { + this.backingFile = backingFile; + thresholdHit = true; + fileListOpMode = FileListOpMode.READ; + this.conf = conf; + } + + /** + * To be used only for WRITE mode; + */ + public FileList(Path backingFile, int cacheSize, HiveConf conf, boolean asyncMode) throws IOException { Review comment: How do you mandate that it will be used only in WRITE mode ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 457108) Time Spent: 0.5h (was: 20m) > Memory efficient iterator should be used during replication. > ------------------------------------------------------------ > > Key: HIVE-23069 > URL: https://issues.apache.org/jira/browse/HIVE-23069 > Project: Hive > Issue Type: Improvement > Reporter: Pravin Sinha > Assignee: Pravin Sinha > Priority: Major > Labels: pull-request-available > Attachments: HIVE-23069.01.patch > > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently the iterator used while copying table data is memory based. In case > of a database with very large number of table/partitions, such iterator may > cause HS2 process to go OOM. > Also introduces a config option to run data copy tasks during repl load > operation. -- This message was sent by Atlassian Jira (v8.3.4#803005)