Hi Fabian,
Thanks for your reply. My custom table source does not implement
ProjectableTableSource. I believe that isFilterPushedDown is implemented
correctly since it's nearly identical to what's written in the
OrcTableSource. I pasted a slightly simplified version of the
implementation below. If you wouldn't mind reading over it, is there
anything obviously wrong?
public final class CustomerTableSource implements BatchTableSource<Customer>,
FilterableTableSource<Customer> {
// Iterator that gets data from a REST API as POJO instances
private final AppResourceIterator<Customer> resourceIterator;
private final String tableName;
private final Class<Customer> modelClass;
private final AppRequestFilter[] filters;
public CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass) {
this(resourceIterator, tableName, modelClass, null);
}
protected CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass,
AppRequestFilter[] filters) {
this.resourceIterator = resourceIterator;
this.tableName = tableName;
this.modelClass = modelClass;
this.filters = filters;
}
@Override
public TableSource<Customer> applyPredicate(List<Expression> predicates) {
List<Expression> acceptedPredicates = new ArrayList<>();
List<AppRequestFilter> acceptedFilters = new ArrayList<>();
for (final Expression predicate : predicates) {
buildFilterForPredicate(predicate).ifPresent(filter -> {
acceptedFilters.add(filter);
acceptedPredicates.add(predicate);
});
}
predicates.removeAll(acceptedPredicates);
return new CustomerTableSource(
resourceIterator.withFilters(acceptedFilters),
tableName,
modelClass,
acceptedFilters.toArray(new AppRequestFilter[0])
);
}
public Optional<AppRequestFilter>
buildFilterForPredicate(Expression predicate) {
// Code for translating an Expression into an AppRequestFilter
// Returns Optional.empty() for predicates we don't want to /
can't apply
}
@Override
public boolean isFilterPushedDown() {
return filters != null;
}
@Override
public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) {
return execEnv.fromCollection(resourceIterator, modelClass);
}
@Override
public TypeInformation<Customer> getReturnType() {
return TypeInformation.of(modelClass);
}
@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(getReturnType());
}
}
Thanks,
Josh
On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <[email protected]> wrote:
> Hi Josh,
>
> Does your TableSource also implement ProjectableTableSource?
> If yes, you need to make sure that the filter information is also
> forwarded if ProjectableTableSource.projectFields() is called after
> FilterableTableSource.applyPredicate().
> Also make sure to correctly implement
> FilterableTableSource.isFilterPushedDown().
>
> Hope this helps,
> Fabian
>
> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
> [email protected]>:
>
>> Hi all,
>>
>> I'm trying to implement filter push-down on a custom BatchTableSource
>> that retrieves data from a REST API and returns it as POJO instances. I've
>> implemented FilterableTableSource as described in the docs, returning a new
>> instance of my table source containing the predicates that I've removed
>> from the list of predicates passed into applyPredicate. However, when
>> getDataSet is eventually called, it's called on the instance of the table
>> source that was originally registered with the table environment, which
>> does not have any filters in it. I've stepped through the code in a
>> debugger, and applyPredicates is definitely being called, and it's
>> definitely returning new instances of my table source, but they don't seem
>> to be being used.
>>
>> I also played with the OrcTableSource, which is the only example of a
>> push-down filter implementation I could find, and it doesn't behave this
>> way. When I set a breakpoint in getDataSet in that case, it's being called
>> on one of the new instances of the table source that contains the accepted
>> filters.
>>
>> Are there any other requirements for implementing push-down filters that
>> aren't listed in the docs? Or does anyone have any tips for this?
>>
>> Thanks,
>>
>> Josh
>>
>> --
>> Josh Bradt
>> Software Engineer
>> 225 Franklin St, Boston, MA 02110
>> klaviyo.com <https://www.klaviyo.com>
>> [image: Klaviyo Logo]
>>
>
--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
[image: Klaviyo Logo]