Hi Fabian,

Thanks for your reply. My custom table source does not implement
ProjectableTableSource. I believe that isFilterPushedDown is implemented
correctly since it's nearly identical to what's written in the
OrcTableSource. I pasted a slightly simplified version of the
implementation below. If you wouldn't mind reading over it, is there
anything obviously wrong?

public final class CustomerTableSource implements BatchTableSource<Customer>,
        FilterableTableSource<Customer> {

    // Iterator that gets data from a REST API as POJO instances
    private final AppResourceIterator<Customer> resourceIterator;
    private final String tableName;
    private final Class<Customer> modelClass;
    private final AppRequestFilter[] filters;

    public CustomerTableSource(
            AppResourceIterator<Customer> resourceIterator,
            String tableName,
            Class<Customer> modelClass) {

        this(resourceIterator, tableName, modelClass, null);
    }

    protected CustomerTableSource(
            AppResourceIterator<Customer> resourceIterator,
            String tableName,
            Class<Customer> modelClass,
            AppRequestFilter[] filters) {

        this.resourceIterator = resourceIterator;
        this.tableName = tableName;
        this.modelClass = modelClass;
        this.filters = filters;
    }

    @Override
    public TableSource<Customer> applyPredicate(List<Expression> predicates) {
        List<Expression> acceptedPredicates = new ArrayList<>();
        List<AppRequestFilter> acceptedFilters = new ArrayList<>();

        for (final Expression predicate : predicates) {
            buildFilterForPredicate(predicate).ifPresent(filter -> {
                acceptedFilters.add(filter);
                acceptedPredicates.add(predicate);
            });
        }

        predicates.removeAll(acceptedPredicates);

        return new CustomerTableSource(
                resourceIterator.withFilters(acceptedFilters),
                tableName,
                modelClass,
                acceptedFilters.toArray(new AppRequestFilter[0])
        );
    }

    public Optional<AppRequestFilter>
buildFilterForPredicate(Expression predicate) {
        // Code for translating an Expression into an AppRequestFilter
        // Returns Optional.empty() for predicates we don't want to /
can't apply
    }

    @Override
    public boolean isFilterPushedDown() {
        return filters != null;
    }

    @Override
    public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) {
        return execEnv.fromCollection(resourceIterator, modelClass);
    }

    @Override
    public TypeInformation<Customer> getReturnType() {
        return TypeInformation.of(modelClass);
    }

    @Override
    public TableSchema getTableSchema() {
        return TableSchema.fromTypeInfo(getReturnType());
    }
}


Thanks,

Josh

On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Josh,
>
> Does your TableSource also implement ProjectableTableSource?
> If yes, you need to make sure that the filter information is also
> forwarded if ProjectableTableSource.projectFields() is called after
> FilterableTableSource.applyPredicate().
> Also make sure to correctly implement
> FilterableTableSource.isFilterPushedDown().
>
> Hope this helps,
> Fabian
>
> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
> josh.br...@klaviyo.com>:
>
>> Hi all,
>>
>> I'm trying to implement filter push-down on a custom BatchTableSource
>> that retrieves data from a REST API and returns it as POJO instances. I've
>> implemented FilterableTableSource as described in the docs, returning a new
>> instance of my table source containing the predicates that I've removed
>> from the list of predicates passed into applyPredicate. However, when
>> getDataSet is eventually called, it's called on the instance of the table
>> source that was originally registered with the table environment, which
>> does not have any filters in it. I've stepped through the code in a
>> debugger, and applyPredicates is definitely being called, and it's
>> definitely returning new instances of my table source, but they don't seem
>> to be being used.
>>
>> I also played with the OrcTableSource, which is the only example of a
>> push-down filter implementation I could find, and it doesn't behave this
>> way. When I set a breakpoint in getDataSet in that case, it's being called
>> on one of the new instances of the table source that contains the accepted
>> filters.
>>
>> Are there any other requirements for implementing push-down filters that
>> aren't listed in the docs? Or does anyone have any tips for this?
>>
>> Thanks,
>>
>> Josh
>>
>> --
>> Josh Bradt
>> Software Engineer
>> 225 Franklin St, Boston, MA 02110
>> klaviyo.com <https://www.klaviyo.com>
>> [image: Klaviyo Logo]
>>
>

-- 
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
[image: Klaviyo Logo]

Reply via email to