Hi Josh,

The code looks good to me.
This seems to be a bug then.
It's strange that it works for ORC.

Would you mind opening a Jira ticket and maybe a simple reproducable code
example?

Thank you,
Fabian

Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <josh.br...@klaviyo.com
>:

> Hi Fabian,
>
> Thanks for your reply. My custom table source does not implement
> ProjectableTableSource. I believe that isFilterPushedDown is implemented
> correctly since it's nearly identical to what's written in the
> OrcTableSource. I pasted a slightly simplified version of the
> implementation below. If you wouldn't mind reading over it, is there
> anything obviously wrong?
>
> public final class CustomerTableSource implements BatchTableSource<Customer>,
>         FilterableTableSource<Customer> {
>
>     // Iterator that gets data from a REST API as POJO instances
>     private final AppResourceIterator<Customer> resourceIterator;
>     private final String tableName;
>     private final Class<Customer> modelClass;
>     private final AppRequestFilter[] filters;
>
>     public CustomerTableSource(
>             AppResourceIterator<Customer> resourceIterator,
>             String tableName,
>             Class<Customer> modelClass) {
>
>         this(resourceIterator, tableName, modelClass, null);
>     }
>
>     protected CustomerTableSource(
>             AppResourceIterator<Customer> resourceIterator,
>             String tableName,
>             Class<Customer> modelClass,
>             AppRequestFilter[] filters) {
>
>         this.resourceIterator = resourceIterator;
>         this.tableName = tableName;
>         this.modelClass = modelClass;
>         this.filters = filters;
>     }
>
>     @Override
>     public TableSource<Customer> applyPredicate(List<Expression> predicates) {
>         List<Expression> acceptedPredicates = new ArrayList<>();
>         List<AppRequestFilter> acceptedFilters = new ArrayList<>();
>
>         for (final Expression predicate : predicates) {
>             buildFilterForPredicate(predicate).ifPresent(filter -> {
>                 acceptedFilters.add(filter);
>                 acceptedPredicates.add(predicate);
>             });
>         }
>
>         predicates.removeAll(acceptedPredicates);
>
>         return new CustomerTableSource(
>                 resourceIterator.withFilters(acceptedFilters),
>                 tableName,
>                 modelClass,
>                 acceptedFilters.toArray(new AppRequestFilter[0])
>         );
>     }
>
>     public Optional<AppRequestFilter> buildFilterForPredicate(Expression 
> predicate) {
>         // Code for translating an Expression into an AppRequestFilter
>         // Returns Optional.empty() for predicates we don't want to / can't 
> apply
>     }
>
>     @Override
>     public boolean isFilterPushedDown() {
>         return filters != null;
>     }
>
>     @Override
>     public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) {
>         return execEnv.fromCollection(resourceIterator, modelClass);
>     }
>
>     @Override
>     public TypeInformation<Customer> getReturnType() {
>         return TypeInformation.of(modelClass);
>     }
>
>     @Override
>     public TableSchema getTableSchema() {
>         return TableSchema.fromTypeInfo(getReturnType());
>     }
> }
>
>
> Thanks,
>
> Josh
>
> On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Josh,
>>
>> Does your TableSource also implement ProjectableTableSource?
>> If yes, you need to make sure that the filter information is also
>> forwarded if ProjectableTableSource.projectFields() is called after
>> FilterableTableSource.applyPredicate().
>> Also make sure to correctly implement
>> FilterableTableSource.isFilterPushedDown().
>>
>> Hope this helps,
>> Fabian
>>
>> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
>> josh.br...@klaviyo.com>:
>>
>>> Hi all,
>>>
>>> I'm trying to implement filter push-down on a custom BatchTableSource
>>> that retrieves data from a REST API and returns it as POJO instances. I've
>>> implemented FilterableTableSource as described in the docs, returning a new
>>> instance of my table source containing the predicates that I've removed
>>> from the list of predicates passed into applyPredicate. However, when
>>> getDataSet is eventually called, it's called on the instance of the table
>>> source that was originally registered with the table environment, which
>>> does not have any filters in it. I've stepped through the code in a
>>> debugger, and applyPredicates is definitely being called, and it's
>>> definitely returning new instances of my table source, but they don't seem
>>> to be being used.
>>>
>>> I also played with the OrcTableSource, which is the only example of a
>>> push-down filter implementation I could find, and it doesn't behave this
>>> way. When I set a breakpoint in getDataSet in that case, it's being called
>>> on one of the new instances of the table source that contains the accepted
>>> filters.
>>>
>>> Are there any other requirements for implementing push-down filters that
>>> aren't listed in the docs? Or does anyone have any tips for this?
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>> --
>>> Josh Bradt
>>> Software Engineer
>>> 225 Franklin St, Boston, MA 02110
>>> klaviyo.com <https://www.klaviyo.com>
>>> [image: Klaviyo Logo]
>>>
>>
>
> --
> Josh Bradt
> Software Engineer
> 225 Franklin St, Boston, MA 02110
> klaviyo.com <https://www.klaviyo.com>
> [image: Klaviyo Logo]
>

Reply via email to