Hi Josh, The code looks good to me. This seems to be a bug then. It's strange that it works for ORC.
Would you mind opening a Jira ticket and maybe a simple reproducable code example? Thank you, Fabian Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <josh.br...@klaviyo.com >: > Hi Fabian, > > Thanks for your reply. My custom table source does not implement > ProjectableTableSource. I believe that isFilterPushedDown is implemented > correctly since it's nearly identical to what's written in the > OrcTableSource. I pasted a slightly simplified version of the > implementation below. If you wouldn't mind reading over it, is there > anything obviously wrong? > > public final class CustomerTableSource implements BatchTableSource<Customer>, > FilterableTableSource<Customer> { > > // Iterator that gets data from a REST API as POJO instances > private final AppResourceIterator<Customer> resourceIterator; > private final String tableName; > private final Class<Customer> modelClass; > private final AppRequestFilter[] filters; > > public CustomerTableSource( > AppResourceIterator<Customer> resourceIterator, > String tableName, > Class<Customer> modelClass) { > > this(resourceIterator, tableName, modelClass, null); > } > > protected CustomerTableSource( > AppResourceIterator<Customer> resourceIterator, > String tableName, > Class<Customer> modelClass, > AppRequestFilter[] filters) { > > this.resourceIterator = resourceIterator; > this.tableName = tableName; > this.modelClass = modelClass; > this.filters = filters; > } > > @Override > public TableSource<Customer> applyPredicate(List<Expression> predicates) { > List<Expression> acceptedPredicates = new ArrayList<>(); > List<AppRequestFilter> acceptedFilters = new ArrayList<>(); > > for (final Expression predicate : predicates) { > buildFilterForPredicate(predicate).ifPresent(filter -> { > acceptedFilters.add(filter); > acceptedPredicates.add(predicate); > }); > } > > predicates.removeAll(acceptedPredicates); > > return new CustomerTableSource( > resourceIterator.withFilters(acceptedFilters), > tableName, > modelClass, > acceptedFilters.toArray(new AppRequestFilter[0]) > ); > } > > public Optional<AppRequestFilter> buildFilterForPredicate(Expression > predicate) { > // Code for translating an Expression into an AppRequestFilter > // Returns Optional.empty() for predicates we don't want to / can't > apply > } > > @Override > public boolean isFilterPushedDown() { > return filters != null; > } > > @Override > public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) { > return execEnv.fromCollection(resourceIterator, modelClass); > } > > @Override > public TypeInformation<Customer> getReturnType() { > return TypeInformation.of(modelClass); > } > > @Override > public TableSchema getTableSchema() { > return TableSchema.fromTypeInfo(getReturnType()); > } > } > > > Thanks, > > Josh > > On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Josh, >> >> Does your TableSource also implement ProjectableTableSource? >> If yes, you need to make sure that the filter information is also >> forwarded if ProjectableTableSource.projectFields() is called after >> FilterableTableSource.applyPredicate(). >> Also make sure to correctly implement >> FilterableTableSource.isFilterPushedDown(). >> >> Hope this helps, >> Fabian >> >> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt < >> josh.br...@klaviyo.com>: >> >>> Hi all, >>> >>> I'm trying to implement filter push-down on a custom BatchTableSource >>> that retrieves data from a REST API and returns it as POJO instances. I've >>> implemented FilterableTableSource as described in the docs, returning a new >>> instance of my table source containing the predicates that I've removed >>> from the list of predicates passed into applyPredicate. However, when >>> getDataSet is eventually called, it's called on the instance of the table >>> source that was originally registered with the table environment, which >>> does not have any filters in it. I've stepped through the code in a >>> debugger, and applyPredicates is definitely being called, and it's >>> definitely returning new instances of my table source, but they don't seem >>> to be being used. >>> >>> I also played with the OrcTableSource, which is the only example of a >>> push-down filter implementation I could find, and it doesn't behave this >>> way. When I set a breakpoint in getDataSet in that case, it's being called >>> on one of the new instances of the table source that contains the accepted >>> filters. >>> >>> Are there any other requirements for implementing push-down filters that >>> aren't listed in the docs? Or does anyone have any tips for this? >>> >>> Thanks, >>> >>> Josh >>> >>> -- >>> Josh Bradt >>> Software Engineer >>> 225 Franklin St, Boston, MA 02110 >>> klaviyo.com <https://www.klaviyo.com> >>> [image: Klaviyo Logo] >>> >> > > -- > Josh Bradt > Software Engineer > 225 Franklin St, Boston, MA 02110 > klaviyo.com <https://www.klaviyo.com> > [image: Klaviyo Logo] >