[ 
https://issues.apache.org/jira/browse/HIVE-26151?focusedWorklogId=759056&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-759056
 ]

ASF GitHub Bot logged work on HIVE-26151:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/22 09:00
            Start Date: 20/Apr/22 09:00
    Worklog Time Spent: 10m 
      Work Description: lcspinter commented on code in PR #3222:
URL: https://github.com/apache/hive/pull/3222#discussion_r853859878


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit 
split, TaskAttemptCon
     return new IcebergRecordReader<>();
   }
 
+  private static TableScan scanWithTimeRange(Table table, Configuration conf, 
TableScan scan, long fromTime) {
+    // let's find the corresponding snapshot ID - if the fromTime is before 
the table creation happened, let's use
+    // the first snapshot of the table
+    long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table, 
fromTime)
+        .orElseGet(() -> table.history().get(0).snapshotId());
+    if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+      throw new IllegalArgumentException(
+          "Provided FROM timestamp must be earlier than the latest snapshot of 
the table.");
+    }
+    long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
+    if (toTime != -1) {
+      if (fromTime >= toTime) {

Review Comment:
   I think we can move this check to the beginning of the method, to spare some 
execution time.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java:
##########
@@ -163,4 +165,32 @@ public static void updateSpec(Configuration configuration, 
Table table) {
   public static boolean isBucketed(Table table) {
     return table.spec().fields().stream().anyMatch(f -> 
f.transform().toString().startsWith("bucket["));
   }
+
+  /**
+   * Returns the snapshot ID which is immediately before (or exactly at) the 
timestamp provided in millis.
+   * If the timestamp provided is before the first snapshot of the table, we 
return an empty optional.
+   * If the timestamp provided is in the future compared to the latest 
snapshot, we return the latest snapshot ID.
+   *
+   * E.g.: if we have snapshots S1, S2, S3 committed at times T3, T6, T9 
respectively (T0 = start of epoch), then:
+   * - from T0 to T2 -> returns empty
+   * - from T3 to T5 -> returns S1
+   * - from T6 to T8 -> returns S2
+   * - from T9 to T∞ -> returns S3
+   *
+   * @param table the table whose snapshot ID we are trying to find
+   * @param time the timestamp provided in milliseconds
+   * @return the snapshot ID corresponding to the time
+   */
+  public static Optional<Long> findSnapshotForTimestamp(Table table, long 
time) {
+    if (table.history().get(0).timestampMillis() > time) {
+      return Optional.empty();
+    }
+
+    for (Snapshot snapshot : table.snapshots()) {

Review Comment:
   Are we certain that the table.snapshots() returns a list sorted by snapshot 
time?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit 
split, TaskAttemptCon
     return new IcebergRecordReader<>();
   }
 
+  private static TableScan scanWithTimeRange(Table table, Configuration conf, 
TableScan scan, long fromTime) {
+    // let's find the corresponding snapshot ID - if the fromTime is before 
the table creation happened, let's use
+    // the first snapshot of the table
+    long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table, 
fromTime)
+        .orElseGet(() -> table.history().get(0).snapshotId());
+    if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+      throw new IllegalArgumentException(
+          "Provided FROM timestamp must be earlier than the latest snapshot of 
the table.");
+    }
+    long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);

Review Comment:
   nit: Can we move the toTime to the method param? 



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit 
split, TaskAttemptCon
     return new IcebergRecordReader<>();
   }
 
+  private static TableScan scanWithTimeRange(Table table, Configuration conf, 
TableScan scan, long fromTime) {
+    // let's find the corresponding snapshot ID - if the fromTime is before 
the table creation happened, let's use
+    // the first snapshot of the table
+    long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table, 
fromTime)
+        .orElseGet(() -> table.history().get(0).snapshotId());
+    if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+      throw new IllegalArgumentException(
+          "Provided FROM timestamp must be earlier than the latest snapshot of 
the table.");
+    }
+    long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
+    if (toTime != -1) {
+      if (fromTime >= toTime) {
+        throw new IllegalArgumentException(
+            "Provided FROM timestamp must precede the provided TO timestamp.");
+      }
+      long toSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table, 
toTime)
+          .orElseThrow(() -> new IllegalArgumentException(
+              "Provided TO timestamp must be after the first snapshot of the 
table."));
+      return scan.appendsBetween(fromSnapshot, toSnapshot);
+    } else {
+      return scan.appendsAfter(fromSnapshot);
+    }
+  }
+
+  private static TableScan scanWithVersionRange(Configuration conf, TableScan 
scan, long fromSnapshot) {
+    long toSnapshot = conf.getLong(InputFormatConfig.TO_VERSION, -1);

Review Comment:
   Nit: move toSnapshot to method param





Issue Time Tracking
-------------------

    Worklog Id:     (was: 759056)
    Time Spent: 20m  (was: 10m)

> Support range-based time travel queries for Iceberg
> ---------------------------------------------------
>
>                 Key: HIVE-26151
>                 URL: https://issues.apache.org/jira/browse/HIVE-26151
>             Project: Hive
>          Issue Type: New Feature
>            Reporter: Marton Bod
>            Assignee: Marton Bod
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Allow querying which records have been inserted during a certain time window 
> for Iceberg tables. The Iceberg TableScan API provides an implementation for 
> that, so most of the work would go into adding syntax support and 
> transporting the startTime and endTime parameters to the Iceberg input format.
> Proposed new syntax: 
> SELECT * FROM table FOR SYSTEM_TIME FROM '<startTime>' TO '<endTime>'
> SELECT * FROM table FOR SYSTEM_VERSION FROM <startVersion> TO <endVersion>
> (the TO clause is optional in both cases)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to