[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928397#comment-15928397 ]
ASF GitHub Bot commented on FLINK-3849: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106445598 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase { filterableSource: FilterableTableSource[_], description: String): Unit = { - if (filterableSource.isFilterPushedDown) { - // The rule can get triggered again due to the transformed "scan => filter" - // sequence created by the earlier execution of this rule when we could not - // push all the conditions into the scan - return - } + Preconditions.checkArgument(!filterableSource.isFilterPushedDown) val program = calc.getProgram + val functionCatalog = FunctionCatalog.withBuiltIns val (predicates, unconvertedRexNodes) = RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, - tableSourceTable.tableEnv.getFunctionCatalog) + functionCatalog) if (predicates.isEmpty) { // no condition can be translated to expression return } - val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) - // trying to apply filter push down, set the flag to true no matter whether - // we actually push any filters down. - newTableSource.setFilterPushedDown(true) + val remainingPredicates = new util.LinkedList[Expression]() + predicates.foreach(e => remainingPredicates.add(e)) + + val newTableSource = filterableSource.applyPredicate(remainingPredicates) --- End diff -- Add a check that `remainingPredicates` is a subset of `predicates`? The table source should not touch those predicates that it cannot evaluate or add new predicates. > Add FilterableTableSource interface and translation rule > -------------------------------------------------------- > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)