Hi, Consider the following statements:
1) > scala> val df = spark.read.format("com.shubham.MyDataSource").load > scala> df.show > +---+---+ > | i| j| > +---+---+ > | 0| 0| > | 1| -1| > | 2| -2| > | 3| -3| > | 4| -4| > +---+---+ > 2) > scala> val df1 = df.filter("i < 3") > scala> df1.show > +---+---+ > | i| j| > +---+---+ > | 0| 0| > | 1| -1| > | 2| -2| > +---+---+ > 3) > scala> df.join(df1, Seq("i"), "left_outer").show > +---+---+---+ > | i| j| j| > +---+---+---+ > | 1| -1| -1| > | 2| -2| -2| > | 0| 0| 0| > +---+---+---+ 3) is not producing the right results for left_outer join. Here is the minimal code. ------------------------------------------------------------------- public class MyDataSourceReader implements DataSourceReader, SupportsPushDownFilters { private Filter[] pushedFilters = new Filter[0]; private boolean hasFilters = false; public MyDataSourceReader(Map<String, String> options) { System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this); } @Override public StructType readSchema() { return (new StructType()) .add("i", "int") .add("j", "int"); } @Override public Filter[] pushFilters(Filter[] filters) { System.out.println("MyDataSourceReader.pushFilters: " + Arrays.toString(filters)); hasFilters = true; pushedFilters = filters; // filter's that can't be pushed down. return new Filter[0]; } @Override public Filter[] pushedFilters() { return pushedFilters; } @Override public List<DataReaderFactory<Row>> createDataReaderFactories() { System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories======="); int ltFilter = Integer.MAX_VALUE; if (hasFilters) { ltFilter = getLTFilter("i"); } hasFilters = false; return Lists.newArrayList(new SimpleDataReaderFactory(0, 5, ltFilter)); } private int getLTFilter(String attributeName) { int filterValue = Integer.MAX_VALUE; for (Filter pushedFilter : pushedFilters) { if (pushedFilter instanceof LessThan) { LessThan lt = (LessThan) pushedFilter; if (lt.attribute().equals(attributeName)) { filterValue = (int) lt.value(); } } } return filterValue; } } ------------------------------------------------------------ public class SimpleDataReaderFactory implements DataReaderFactory<Row> { private final int start; private final int end; private int current; private final int iLTFilter; public SimpleDataReaderFactory(int start, int end, int iLTFilter) { this.start = start; this.end = end; this.iLTFilter = iLTFilter; } @Override public DataReader<Row> createDataReader() { return new SimpleDataReader(start, end, iLTFilter); } public static class SimpleDataReader implements DataReader<Row> { private final int start; private final int end; private int current; private int iLTFilter; public SimpleDataReader(int start, int end, int iLTFilter) { this.start = start; this.end = end; this.current = start - 1; this.iLTFilter = iLTFilter; } @Override public boolean next() { current++; return current < end && current < iLTFilter ; } @Override public Row get() { return new GenericRow(new Object[]{current, -current}); } @Override public void close() { } } } ------------------------------------------------------------ It seems that somehow spark is applying filter (i < 3) after left_join operation too because of which we see incorrect results in 3). However I don't see any filter node after join in plan. == Physical Plan == > *(5) Project [i#136, j#137, j#228] > +- SortMergeJoin [i#136], [i#227], LeftOuter > :- *(2) Sort [i#136 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(i#136, 200) > : +- *(1) DataSourceV2Scan [i#136, j#137], > com.shubham.reader.MyDataSourceReader@714bd7ad > +- *(4) Sort [i#227 ASC NULLS FIRST], false, 0 > +- ReusedExchange [i#227, j#228], Exchange hashpartitioning(i#136, > 200) Any ideas what might be going wrong? Thanks, Shubham