This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 775f73f03 IMPALA-14462: Fix tie-breaking for sorting scan ranges 
oldest to newest
775f73f03 is described below

commit 775f73f03ea59401ca2752383182185599b9777d
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed Sep 24 12:40:37 2025 -0700

    IMPALA-14462: Fix tie-breaking for sorting scan ranges oldest to newest
    
    TestTupleCacheFullCluster.test_scan_range_distributed is flaky on s3
    builds. The addition of a single file is changing scheduling significantly
    even with scan ranges sorted oldest to newest. This is because modification
    times on S3 have a granularity of one second. Multiple files have the
    same modification time, and the fix for IMPALA-13548 did not properly
    break ties for sorting.
    
    This adds logic to break ties for files with the same modification
    time. It compares the path (absolute path or relative path + partition)
    as well as the offset within the file. These should be enough to break
    all conceivable ties, as it is not possible to have two scan ranges with
    the same file at the same offset. In debug builds, this does additional
    validation to make sure that when a != b, comp(a, b) != comp(b, a).
    
    The test requires that adding a single file to the table changes exactly
    one cache key. If that final file has the same modification time as
    an existing file, scheduling may still mix up the files and change more
    than one cache key, even with tie-breaking. This adds a sleep just before
    generating the final file to guarantee that it gets a newer modification
    time.
    
    Testing:
     - Ran TestTupleCacheFullCluster.test_scan_range_distributed for 15
       iterations on S3
    
    Change-Id: I3f2e40d3f975ee370c659939da0374675a28cd38
    Reviewed-on: http://gerrit.cloudera.org:8080/23458
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
---
 be/src/scheduling/scheduler.cc           | 71 ++++++++++++++++++++++++++++----
 tests/custom_cluster/test_tuple_cache.py |  6 +++
 2 files changed, 68 insertions(+), 9 deletions(-)

diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index facf4a602..5033b692f 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -183,6 +183,60 @@ Status Scheduler::GenerateScanRanges(const 
vector<TFileSplitGeneratorSpec>& spec
   return Status::OK();
 }
 
+// Comparison function for sorting scan ranges oldest to newest. This needs to 
return true
+// if scanRange1 is less than scanRange2 and false otherwise.
+static bool ScanRangeOldestToNewestComparator(
+    const TScanRangeLocationList& scanRange1, const TScanRangeLocationList& 
scanRange2) {
+  DCHECK(scanRange1.scan_range.__isset.hdfs_file_split);
+  const THdfsFileSplit& split1 = scanRange1.scan_range.hdfs_file_split;
+  DCHECK(scanRange2.scan_range.__isset.hdfs_file_split);
+  const THdfsFileSplit& split2 = scanRange2.scan_range.hdfs_file_split;
+  // Multiple files (or multiple splits from the same file) can have the same
+  // modification time, so we need tie-breaking when they are equal.
+  if (split1.mtime != split2.mtime) return split1.mtime < split2.mtime;
+  if (!split1.__isset.absolute_path && !split2.__isset.absolute_path) {
+    // If neither has an absolute path set (the common case), compare the
+    // partition hash and relative path
+    if (split1.partition_path_hash != split2.partition_path_hash) {
+      return split1.partition_path_hash < split2.partition_path_hash;
+    }
+    if (split1.relative_path != split2.relative_path) {
+      return split1.relative_path < split2.relative_path;
+    }
+  } else {
+    // If only one has an absolute path, sort absolute paths ahead of relative 
paths.
+    if (split1.__isset.absolute_path && !split2.__isset.absolute_path) return 
true;
+    if (!split1.__isset.absolute_path && split2.__isset.absolute_path) return 
false;
+    // Both are absolute, so compare them
+    if (split1.absolute_path != split2.absolute_path) {
+      return split1.absolute_path < split2.absolute_path;
+    }
+  }
+  if (split1.offset != split2.offset) return split1.offset < split2.offset;
+
+  // If we get here, something is wrong. There can't be two scan ranges with 
the same
+  // filename and offset.
+  DCHECK(false) << "Duplicate scan range when sorting. Split 1: " << split1
+                << " Split 2: " << split2;
+  return false;
+}
+
+#ifndef NDEBUG
+// For debug builds, do additional validation of the ordering produced by the
+// comparator. Specifically, for different a and b, comp(a, b) != comp(b, a).
+// For the scheduling use case, we know that a and b are different.
+static bool ScanRangeOldestToNewestComparatorWithValidation(
+    const TScanRangeLocationList& scanRange1, const TScanRangeLocationList& 
scanRange2) {
+  bool forwards_result = ScanRangeOldestToNewestComparator(scanRange1, 
scanRange2);
+  bool backwards_result = ScanRangeOldestToNewestComparator(scanRange2, 
scanRange1);
+  DCHECK_NE(forwards_result, backwards_result)
+    << "Comparator violates ordering requirements:"
+    << " Comp(" << scanRange1 << ", " << scanRange2 << ") = " << 
forwards_result
+    << " Comp(" << scanRange2 << ", " << scanRange1 << ") = " << 
backwards_result;
+  return forwards_result;
+}
+#endif
+
 Status Scheduler::ComputeScanRangeAssignment(
     const ExecutorConfig& executor_config, ScheduleState* state) {
   RuntimeProfile::Counter* total_assignment_timer =
@@ -234,16 +288,15 @@ Status Scheduler::ComputeScanRangeAssignment(
             entry.second.concrete_ranges.size() + 
entry.second.split_specs.size());
         // This only makes sense for HDFS scan nodes
         DCHECK(node.__isset.hdfs_scan_node);
-        // Sort the scan ranges by modification time ascending
+        // Sort the scan ranges by modification time ascending. In debug mode, 
do
+        // additional validation of the ordering.
+#ifndef NDEBUG
         std::sort(expanded_locations.begin(), expanded_locations.end(),
-            [](const TScanRangeLocationList& scanRange1,
-               const TScanRangeLocationList& scanRange2) {
-              DCHECK(scanRange1.scan_range.__isset.hdfs_file_split);
-              const THdfsFileSplit& split1 = 
scanRange1.scan_range.hdfs_file_split;
-              DCHECK(scanRange2.scan_range.__isset.hdfs_file_split);
-              const THdfsFileSplit& split2 = 
scanRange2.scan_range.hdfs_file_split;
-              return split1.mtime < split2.mtime;
-            });
+            ScanRangeOldestToNewestComparatorWithValidation);
+#else
+        std::sort(expanded_locations.begin(), expanded_locations.end(),
+            ScanRangeOldestToNewestComparator);
+#endif
       }
       DCHECK(locations != nullptr);
       RETURN_IF_ERROR(
diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index f3f76acb6..54690c605 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -879,6 +879,12 @@ class TestTupleCacheFullCluster(TestTupleCacheBase):
       impalad: self.get_tuple_cache_metric(impalad.service, "entries-in-use")
       for impalad in self.cluster.impalads}
 
+    # Some modification times have coarse granularity (e.g. a second). If the 
insert runs
+    # too quickly, the new file could have the same modification time as an 
existing
+    # file. In that case, the sort may not place it last, causing unexpected 
changes to
+    # the cache keys. Sleep a bit to guarantee a newer modification time.
+    time.sleep(3)
+
     # Insert another row, which creates a file / scan range
     # This uses a very large seed for table_value() to get a unique row that 
isn't
     # already in the table.

Reply via email to