hubgeter commented on code in PR #44001:
URL: https://github.com/apache/doris/pull/44001#discussion_r1879279169


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class AcidUtil {
+    private static final Logger LOG = LogManager.getLogger(AcidUtil.class);
+
+    public static final String VALID_TXNS_KEY = "hive.txn.valid.txns";
+    public static final String VALID_WRITEIDS_KEY = "hive.txn.valid.writeids";
+
+    private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = 
"bucket_";
+
+    // An `_orc_acid_version` file is written to each base/delta/delete_delta 
dir written by a full acid write
+    // or compaction.  This is the primary mechanism for versioning acid data.
+    // Each individual ORC file written stores the current version as a user 
property in ORC footer. All data files
+    // produced by Acid write should have this (starting with Hive 3.0), 
including those written by compactor.This is
+    // more for sanity checking in case someone moved the files around or 
something like that.
+    // In hive, methods for getting/reading the version from files were moved 
to test which is the only place they are
+    // used (after HIVE-23506), in order to keep devs out of temptation, since 
they access the FileSystem which
+    // is expensive.
+    // After `HIVE-23825: Create a flag to turn off _orc_acid_version file 
creation`, introduce variables to
+    // control whether to generate `_orc_acid_version` file. So don't need 
check this file exist.
+    private static final String HIVE_ORC_ACID_VERSION_FILE = 
"_orc_acid_version";
+
+    @Getter
+    @ToString
+    @EqualsAndHashCode
+    private static class ParsedBase {
+        private final long writeId;
+        private final long visibilityId;
+
+        public ParsedBase(long writeId, long visibilityId) {
+            this.writeId = writeId;
+            this.visibilityId = visibilityId;
+        }
+    }
+
+    private static ParsedBase parseBase(String name) {
+        //format1 : base_writeId
+        //format2 : base_writeId_visibilityId  detail: 
https://issues.apache.org/jira/browse/HIVE-20823
+        name = name.substring("base_".length());
+        int index = name.indexOf("_v");
+        if (index == -1) {
+            return new ParsedBase(Long.parseLong(name), 0);
+        }
+        return new ParsedBase(
+                Long.parseLong(name.substring(0, index)),
+                Long.parseLong(name.substring(index + 2)));
+    }
+
+    @Getter
+    @ToString
+    @EqualsAndHashCode
+    private static class ParsedDelta implements Comparable<ParsedDelta> {
+        private final long min;
+        private final long max;
+        private final String path;
+        private final int statementId;
+        private final boolean deleteDelta;
+        private final long visibilityId;
+
+        public ParsedDelta(long min, long max, @NonNull String path, int 
statementId,
+                boolean deleteDelta, long visibilityId) {
+            this.min = min;
+            this.max = max;
+            this.path = path;
+            this.statementId = statementId;
+            this.deleteDelta = deleteDelta;
+            this.visibilityId = visibilityId;
+        }
+
+        /*
+         * Smaller minWID orders first;
+         * If minWID is the same, larger maxWID orders first;
+         * Otherwise, sort by stmtID; files w/o stmtID orders first.
+         *
+         * Compactions (Major/Minor) merge deltas/bases but delete of old files
+         * happens in a different process; thus it's possible to have 
bases/deltas with
+         * overlapping writeId boundaries.  The sort order helps figure out 
the "best" set of files
+         * to use to get data.
+         * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts 
before delta_5_10 (and delta_11_20)
+         */
+        @Override
+        public int compareTo(ParsedDelta other) {
+            return min != other.min ? Long.compare(min, other.min) :
+                    other.max != max ? Long.compare(other.max, max) :
+                            statementId != other.statementId
+                                    ? Integer.compare(statementId, 
other.statementId) :
+                                    path.compareTo(other.path);
+        }
+    }
+
+
+    private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, 
String baseDir)
+            throws IOException {
+        String fileLocation = baseDir + "_metadata_acid";
+        Status status = fileSystem.exists(fileLocation);
+        if (status != Status.OK) {
+            return false;
+        }
+        //In order to save the cost of reading the file content, we only check 
whether the file exists.
+        // File Contents: {"thisFileVersion":"0","dataFormat":"compacted"}
+        //
+        // Map<String, String> metadata;
+        // try (var in = read(fileLocation)) {
+        //     metadata = new ObjectMapper().readValue(in, new 
TypeReference<>() {});
+        // }
+        // catch (IOException e) {
+        //     throw new IOException(String.format("Failed to read %s: %s", 
fileLocation, e.getMessage()), e);
+        // }
+        //
+        // String version = metadata.get("thisFileVersion");
+        // if (!"0".equals(version)) {
+        //     throw new IOException("Unexpected ACID metadata version: " + 
version);
+        // }
+        //
+        // String format = metadata.get("dataFormat");
+        // if (!"compacted".equals(format)) {
+        //     throw new IOException("Unexpected value for ACID dataFormat: " 
+ format);
+        // }
+        return true;
+    }
+
+    private static boolean isValidBase(RemoteFileSystem remoteFileSystem, 
String baseDir,
+            ParsedBase base, ValidWriteIdList writeIdList) throws IOException {
+        if (base.writeId == Long.MIN_VALUE) {
+            //Ref: https://issues.apache.org/jira/browse/HIVE-13369
+            //such base is created by 1st compaction in case of non-acid to 
acid table conversion.(you
+            //will get dir: `base_-9223372036854775808`)
+            //By definition there are no open txns with id < 1.
+            //After this: https://issues.apache.org/jira/browse/HIVE-18192, 
txns(global transaction ID) => writeId.
+            return true;
+        }
+
+        // hive 4 : just check "_v" suffix, before hive 4 : check 
`_metadata_acid` file in baseDir.
+        if ((base.visibilityId > 0) || isValidMetaDataFile(remoteFileSystem, 
baseDir)) {
+            return writeIdList.isValidBase(base.writeId);
+        }
+
+        // if here, it's a result of IOW
+        return writeIdList.isWriteIdValid(base.writeId);
+    }
+
+    private static ParsedDelta parseDelta(String fileName, String deltaPrefix, 
String path) {
+        // format1: delta_min_max_statementId_visibilityId, 
delete_delta_min_max_statementId_visibilityId
+        //     _visibilityId maybe not exists.
+        //     detail: https://issues.apache.org/jira/browse/HIVE-20823
+        // format2: delta_min_max_visibilityId, delete_delta_min_visibilityId
+        //     when minor compaction runs, we collapse per statement delta 
files inside a single
+        //     transaction so we no longer need a statementId in the file name
+
+        // String fileName = fileName.substring(name.lastIndexOf('/') + 1);
+        // checkArgument(fileName.startsWith(deltaPrefix), "File does not 
start with '%s': %s", deltaPrefix, path);
+
+        long visibilityId = 0;
+        int visibilityIdx = fileName.indexOf("_v");
+        if (visibilityIdx != -1) {
+            visibilityId = Long.parseLong(fileName.substring(visibilityIdx + 
2));
+            fileName = fileName.substring(0, visibilityIdx);
+        }
+
+        boolean deleteDelta = deltaPrefix.equals("delete_delta_");
+
+        String rest = fileName.substring(deltaPrefix.length());
+        int split = rest.indexOf('_');
+        int split2 = rest.indexOf('_', split + 1);
+        long min = Long.parseLong(rest.substring(0, split));
+
+        if (split2 == -1) {
+            long max = Long.parseLong(rest.substring(split + 1));
+            return new ParsedDelta(min, max, fileName, -1, deleteDelta, 
visibilityId);
+        }
+
+        long max = Long.parseLong(rest.substring(split + 1, split2));
+        int statementId = Integer.parseInt(rest.substring(split2 + 1));
+        return new ParsedDelta(min, max, path, statementId, deleteDelta, 
visibilityId);
+    }
+
+    //Since the hive3 library cannot read the hive4 transaction table 
normally, and there are many problems
+    // when using the Hive 4 library directly, this method is implemented.
+    //Ref: 
hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState
+    public static FileCacheValue getAcidState(RemoteFileSystem fileSystem, 
HivePartition partition,

Review Comment:
   notice:  [fix](multi-catalog)fix hive table with cosn location issue (#23409)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to