Hi Rex,

ListView and MapView have been part of Flink for years. However, they were considered as an internal feature and therefore not well documented. MapView is used internally to make distinct aggregates work.

Because we reworked the type inference of aggregate functions, we also added basic documentation for power users.

By default an accumulator will be deserialized from state on every access. ListView and MapView are not deserialized entirely on access but delegate directly to a state backend. Thus, only the key that is accessed is deserialized. So if an accumulator stores a lot of data, it might be beneficial to use the mentioned abstractions.

Regards,
Timo


On 16.01.21 20:09, Rex Fenley wrote:
Hello,

In the recent version of Flink docs I read the following [1]:
> If an accumulator needs to store large amounts of data, |org.apache.flink.table.api.dataview.ListView| and |org.apache.flink.table.api.dataview.MapView| provide advanced features for leveraging Flink’s state backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information about this advanced feature.

Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we have a number of Aggregators like the following, which group a set of ids by some foreign key "group_id". The sets are usually 10-100 ids in size, but at the largest the sets could at some theoretical point get into the tens of thousands of ids (right now largest sets are ~2000 ids).

table.groupBy($"group_id")
.aggregate(
newIDsAgg()(
$"member_id"
) as ("member_ids")
)
.select($"group_id", $"member_ids")

caseclassIDsAcc(
varIDs: mutable.Set[Long]
)
classIDsAgg extendsAggregateFunction[Row, IDsAcc] {

overridedefcreateAccumulator(): IDsAcc =
IDsAcc(mutable.Set())

defaccumulate(
acc: IDsAcc,
ID: Long
): Unit = {
acc.IDs.add(ID)
}

defretract(acc: IDsAcc, ID: Long): Unit = {
acc.IDs.remove(ID)
}

defresetAccumulator(acc: IDsAcc): Unit = {
acc.IDs = mutable.Set()
}

overridedefgetValue(acc: IDsAcc): Row = {
Row.of(acc.IDs.toArray)
}

overridedefgetResultType: TypeInformation[Row] = {
newRowTypeInfo(
createTypeInformation[Array[Long]]
)
}
}

I read the docs [2] but I don't see it really say anything about why ListView is better than just using a Set or Array. If we were to move from a Set to a ListView what advantages might we see in these Aggregates?

I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we simply miss this feature? Does it work for 1.11.x too?

Thanks!

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods> [2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html>

--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>


Reply via email to