2010YOUY01 commented on PR #14411:
URL: https://github.com/apache/datafusion/pull/14411#issuecomment-2646119135

   Impressive work! I got a suggestion and a high-level question:
   
   ### Suggestion
   I think to justify this change, we have to make sure:
   - No performance regression (benchmarks already showed)
   - Reduce memory footprint, for queries which batch can accumulate in 
`RepartitionExec` (as the origin issue said)
   
   I tried to check the memory usage for `tpch-sf10` and `clickbench`, there is 
no noticeable change for those queries.
   Perhaps we should construct queries with this anti-pattern, and demonstrate 
memory usage can actually reduced by this on-demand repartition executor?
   
   Here is a script for checking memory usage in benchmark queries
   <details>
   
   ```python
   # This script should be placed under benchmarks/
   #
   # Supported benchmarks are 'tpch' and 'clickbench'
   #
   # Example usage:
   # Run TPCH benchmark and save results:
   #   python3 membench.py run --benchmark tpch --result tpch_main.csv
   #   python3 membench.py run --benchmark tpch --result tpch_optimized.csv
   #
   # Compare results:
   #   python3 membench.py compare tpch_main.csv tpch_optimized.csv
   
   import subprocess
   import re
   import csv
   import argparse
   
   def human_readable_size(size):
       units = ["B", "K", "M", "G", "T"]
       index = 0
       while size >= 1024 and index < len(units) - 1:
           size /= 1024.0
           index += 1
       return f"{size:.2f}{units[index]}"
   
   def run_tpch_queries(label, result_file):
       results = []
       for query in range(1, 23):
           cmd = [
               "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", 
"dfbench", 
               "tpch", "--format", "parquet", "--path", "./data/tpch_sf10", 
               "--query", str(query), "--iterations", "1"
           ]
           
           process = subprocess.run(cmd, capture_output=True, text=True, 
shell=False)
           stderr_output = process.stderr
           
           match = re.search(r"(\d+)\s+maximum resident set size", 
stderr_output)
           max_rss = human_readable_size(int(match.group(1))) if match else 
"N/A"
           results.append((query, max_rss))
       
       with open(result_file, "w", newline='') as f:
           writer = csv.writer(f)
           writer.writerow(["Query", "Memory"])
           writer.writerows(results)
       
       print(f"Results saved to {result_file}")
   
   def run_clickbench_queries(label, result_file):
       results = []
       for query in range(0, 43):
           cmd = [
               "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", 
"dfbench", 
               "clickbench", "--path", "./data/hits.parquet", 
               "--queries-path", "./queries/clickbench/queries.sql", 
               "--query", str(query), "--iterations", "1"
           ]
           
           process = subprocess.run(cmd, capture_output=True, text=True, 
shell=False)
           stderr_output = process.stderr
           
           match = re.search(r"(\d+)\s+maximum resident set size", 
stderr_output)
           max_rss = human_readable_size(int(match.group(1))) if match else 
"N/A"
           results.append((query, max_rss))
       
       with open(result_file, "w", newline='') as f:
           writer = csv.writer(f)
           writer.writerow(["Query", "Memory"])
           writer.writerows(results)
       
       print(f"Results saved to {result_file}")
   
   def compare_results(file1, file2):
       results1, results2 = {}, {}
       
       with open(file1, "r") as f1, open(file2, "r") as f2:
           reader1, reader2 = csv.reader(f1), csv.reader(f2)
           next(reader1)  # Skip header
           next(reader2)  # Skip header
           
           for row in reader1:
               results1[row[0]] = row[1]
           for row in reader2:
               results2[row[0]] = row[1]
       
       print(f"{'Query':<10}{'Branch1':<10}{'Branch2':<10}{'Change'}")
       for query in results1:
           mem1 = results1[query]
           mem2 = results2.get(query, "N/A")
           
           if mem1 != "N/A" and mem2 != "N/A":
               size1 = float(mem1[:-1])
               size2 = float(mem2[:-1])
               ratio = size2 / size1 if size1 > 0 else 1.0
               change = f"{ratio:.2f}X" if abs(ratio - 1) > 0.05 else "No 
Change"
           else:
               change = "N/A"
           
           print(f"{query:<10}{mem1:<10}{mem2:<10}{change}")
   
   def main():
       parser = argparse.ArgumentParser()
       parser.add_argument("mode", choices=["run", "compare"], help="Run 
benchmarks or compare results")
       parser.add_argument("--result", help="Output result file for 
benchmarking")
       parser.add_argument("--benchmark", choices=["tpch", "clickbench"], 
help="Specify which benchmark to run")
       parser.add_argument("file1", nargs="?", help="First result file for 
comparison")
       parser.add_argument("file2", nargs="?", help="Second result file for 
comparison")
       args = parser.parse_args()
       
       if args.mode == "run" and args.result and args.benchmark:
           if args.benchmark == "tpch":
               run_tpch_queries("run", args.result)
           elif args.benchmark == "clickbench":
               run_clickbench_queries("run", args.result)
       elif args.mode == "compare" and args.file1 and args.file2:
           compare_results(args.file1, args.file2)
       else:
           print("Invalid arguments. Use --help for usage information.")
   
   if __name__ == "__main__":
       main()
   
   ```
   
   </details>
   
   Results:
   ```
   TPCH:
   ----
   Query     Branch1   Branch2   Change
   1         464.05M   460.78M   No Change
   2         397.00M   412.77M   No Change
   3         714.56M   630.64M   0.88X
   4         408.53M   418.78M   No Change
   5         741.30M   769.73M   No Change
   6         390.02M   398.72M   No Change
   7         3.41G     3.45G     No Change
   8         1.08G     1.05G     No Change
   9         2.37G     2.31G     No Change
   10        1.11G     1.16G     No Change
   11        260.78M   267.41M   No Change
   12        429.95M   449.06M   No Change
   13        675.67M   668.22M   No Change
   14        666.56M   700.22M   No Change
   15        673.66M   656.70M   No Change
   16        485.81M   474.59M   No Change
   17        605.38M   631.92M   No Change
   18        3.26G     3.29G     No Change
   19        500.77M   577.95M   1.15X
   20        1.07G     1.05G     No Change
   21        982.59M   978.69M   No Change
   22        303.86M   302.14M   No Change
   
   Clickbench:
   ...(no change)
   ```
   
   ### Question
   In my understanding the new repartition executor is a wrapper on 
`RepartitionExec`, to enable lazy evaluation, it should support both 
`RoundRobin` and `Hash` repartition right? This PR only swapped `RoundRobin`, 
do you also plan to add on-demand hash repartition in the future?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to