Hi Fabian, Thanks for your reply. My custom table source does not implement ProjectableTableSource. I believe that isFilterPushedDown is implemented correctly since it's nearly identical to what's written in the OrcTableSource. I pasted a slightly simplified version of the implementation below. If you wouldn't mind reading over it, is there anything obviously wrong?
public final class CustomerTableSource implements BatchTableSource<Customer>, FilterableTableSource<Customer> { // Iterator that gets data from a REST API as POJO instances private final AppResourceIterator<Customer> resourceIterator; private final String tableName; private final Class<Customer> modelClass; private final AppRequestFilter[] filters; public CustomerTableSource( AppResourceIterator<Customer> resourceIterator, String tableName, Class<Customer> modelClass) { this(resourceIterator, tableName, modelClass, null); } protected CustomerTableSource( AppResourceIterator<Customer> resourceIterator, String tableName, Class<Customer> modelClass, AppRequestFilter[] filters) { this.resourceIterator = resourceIterator; this.tableName = tableName; this.modelClass = modelClass; this.filters = filters; } @Override public TableSource<Customer> applyPredicate(List<Expression> predicates) { List<Expression> acceptedPredicates = new ArrayList<>(); List<AppRequestFilter> acceptedFilters = new ArrayList<>(); for (final Expression predicate : predicates) { buildFilterForPredicate(predicate).ifPresent(filter -> { acceptedFilters.add(filter); acceptedPredicates.add(predicate); }); } predicates.removeAll(acceptedPredicates); return new CustomerTableSource( resourceIterator.withFilters(acceptedFilters), tableName, modelClass, acceptedFilters.toArray(new AppRequestFilter[0]) ); } public Optional<AppRequestFilter> buildFilterForPredicate(Expression predicate) { // Code for translating an Expression into an AppRequestFilter // Returns Optional.empty() for predicates we don't want to / can't apply } @Override public boolean isFilterPushedDown() { return filters != null; } @Override public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) { return execEnv.fromCollection(resourceIterator, modelClass); } @Override public TypeInformation<Customer> getReturnType() { return TypeInformation.of(modelClass); } @Override public TableSchema getTableSchema() { return TableSchema.fromTypeInfo(getReturnType()); } } Thanks, Josh On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Josh, > > Does your TableSource also implement ProjectableTableSource? > If yes, you need to make sure that the filter information is also > forwarded if ProjectableTableSource.projectFields() is called after > FilterableTableSource.applyPredicate(). > Also make sure to correctly implement > FilterableTableSource.isFilterPushedDown(). > > Hope this helps, > Fabian > > Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt < > josh.br...@klaviyo.com>: > >> Hi all, >> >> I'm trying to implement filter push-down on a custom BatchTableSource >> that retrieves data from a REST API and returns it as POJO instances. I've >> implemented FilterableTableSource as described in the docs, returning a new >> instance of my table source containing the predicates that I've removed >> from the list of predicates passed into applyPredicate. However, when >> getDataSet is eventually called, it's called on the instance of the table >> source that was originally registered with the table environment, which >> does not have any filters in it. I've stepped through the code in a >> debugger, and applyPredicates is definitely being called, and it's >> definitely returning new instances of my table source, but they don't seem >> to be being used. >> >> I also played with the OrcTableSource, which is the only example of a >> push-down filter implementation I could find, and it doesn't behave this >> way. When I set a breakpoint in getDataSet in that case, it's being called >> on one of the new instances of the table source that contains the accepted >> filters. >> >> Are there any other requirements for implementing push-down filters that >> aren't listed in the docs? Or does anyone have any tips for this? >> >> Thanks, >> >> Josh >> >> -- >> Josh Bradt >> Software Engineer >> 225 Franklin St, Boston, MA 02110 >> klaviyo.com <https://www.klaviyo.com> >> [image: Klaviyo Logo] >> > -- Josh Bradt Software Engineer 225 Franklin St, Boston, MA 02110 klaviyo.com <https://www.klaviyo.com> [image: Klaviyo Logo]