Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152577385 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -62,7 +62,19 @@ class FlinkLogicalTableSourceScan( override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val rowCnt = metadata.getRowCount(this) - planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + + val adjustedCnt: Double = tableSource match { + case f: FilterableTableSource[_] if f.isFilterPushedDown => + // ensure we prefer FilterableTableSources with pushed-down filters. + rowCnt - 1.0 --- End diff -- Doesn't really make a difference IMO. It's all about relative costs. We only need to make sure that a `FilterableTableSource` with pushed down predicates appears to be less expensive than the same `TableSource` without pushed predicates. The problem has not occurred before, because the `OrcTableSource` does not guarantee that all emitted rows match the pushed predicates. Therefore, all predicates are also applied by a following `Calc` operator such that the cost of that operator is not decreased.
---