Fascinating, do you have an estimate of what qualifies as a lot of data and therefore when this should be used?
Thanks On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <twal...@apache.org> wrote: > Hi Rex, > > ListView and MapView have been part of Flink for years. However, they > were considered as an internal feature and therefore not well > documented. MapView is used internally to make distinct aggregates work. > > Because we reworked the type inference of aggregate functions, we also > added basic documentation for power users. > > By default an accumulator will be deserialized from state on every > access. ListView and MapView are not deserialized entirely on access but > delegate directly to a state backend. Thus, only the key that is > accessed is deserialized. So if an accumulator stores a lot of data, it > might be beneficial to use the mentioned abstractions. > > Regards, > Timo > > > On 16.01.21 20:09, Rex Fenley wrote: > > Hello, > > > > In the recent version of Flink docs I read the following [1]: > > > If an accumulator needs to store large amounts of data, > > |org.apache.flink.table.api.dataview.ListView| and > > |org.apache.flink.table.api.dataview.MapView| provide advanced features > > for leveraging Flinkās state backends in unbounded data scenarios. > > Please see the docs of the corresponding classes for more information > > about this advanced feature. > > > > Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we > > have a number of Aggregators like the following, which group a set of > > ids by some foreign key "group_id". The sets are usually 10-100 ids in > > size, but at the largest the sets could at some theoretical point get > > into the tens of thousands of ids (right now largest sets are ~2000 ids). > > > > table.groupBy($"group_id") > > .aggregate( > > newIDsAgg()( > > $"member_id" > > ) as ("member_ids") > > ) > > .select($"group_id", $"member_ids") > > > > caseclassIDsAcc( > > varIDs: mutable.Set[Long] > > ) > > classIDsAgg extendsAggregateFunction[Row, IDsAcc] { > > > > overridedefcreateAccumulator(): IDsAcc = > > IDsAcc(mutable.Set()) > > > > defaccumulate( > > acc: IDsAcc, > > ID: Long > > ): Unit = { > > acc.IDs.add(ID) > > } > > > > defretract(acc: IDsAcc, ID: Long): Unit = { > > acc.IDs.remove(ID) > > } > > > > defresetAccumulator(acc: IDsAcc): Unit = { > > acc.IDs = mutable.Set() > > } > > > > overridedefgetValue(acc: IDsAcc): Row = { > > Row.of(acc.IDs.toArray) > > } > > > > overridedefgetResultType: TypeInformation[Row] = { > > newRowTypeInfo( > > createTypeInformation[Array[Long]] > > ) > > } > > } > > > > I read the docs [2] but I don't see it really say anything about why > > ListView is better than just using a Set or Array. > > If we were to move from a Set to a ListView what advantages might we see > > in these Aggregates? > > > > I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we > > simply miss this feature? Does it work for 1.11.x too? > > > > Thanks! > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods > > < > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods > > > > [2] > > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html > > < > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html > > > > > > -- > > > > Rex Fenley|Software Engineer - Mobile and Backend > > > > > > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | > > FOLLOW US <https://twitter.com/remindhq> | LIKE US > > <https://www.facebook.com/remindhq> > > > > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>