Fascinating, do you have an estimate of what qualifies as a lot of data and
therefore when this should be used?

Thanks

On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <twal...@apache.org> wrote:

> Hi Rex,
>
> ListView and MapView have been part of Flink for years. However, they
> were considered as an internal feature and therefore not well
> documented. MapView is used internally to make distinct aggregates work.
>
> Because we reworked the type inference of aggregate functions, we also
> added basic documentation for power users.
>
> By default an accumulator will be deserialized from state on every
> access. ListView and MapView are not deserialized entirely on access but
> delegate directly to a state backend. Thus, only the key that is
> accessed is deserialized. So if an accumulator stores a lot of data, it
> might be beneficial to use the mentioned abstractions.
>
> Regards,
> Timo
>
>
> On 16.01.21 20:09, Rex Fenley wrote:
> > Hello,
> >
> > In the recent version of Flink docs I read the following [1]:
> >  > If an accumulator needs to store large amounts of data,
> > |org.apache.flink.table.api.dataview.ListView| and
> > |org.apache.flink.table.api.dataview.MapView| provide advanced features
> > for leveraging Flink’s state backends in unbounded data scenarios.
> > Please see the docs of the corresponding classes for more information
> > about this advanced feature.
> >
> > Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we
> > have a number of Aggregators like the following, which group a set of
> > ids by some foreign key "group_id". The sets are usually 10-100 ids in
> > size, but at the largest the sets could at some theoretical point get
> > into the tens of thousands of ids (right now largest sets are ~2000 ids).
> >
> > table.groupBy($"group_id")
> > .aggregate(
> > newIDsAgg()(
> > $"member_id"
> > ) as ("member_ids")
> > )
> > .select($"group_id", $"member_ids")
> >
> > caseclassIDsAcc(
> > varIDs: mutable.Set[Long]
> > )
> > classIDsAgg extendsAggregateFunction[Row, IDsAcc] {
> >
> > overridedefcreateAccumulator(): IDsAcc =
> > IDsAcc(mutable.Set())
> >
> > defaccumulate(
> > acc: IDsAcc,
> > ID: Long
> > ): Unit = {
> > acc.IDs.add(ID)
> > }
> >
> > defretract(acc: IDsAcc, ID: Long): Unit = {
> > acc.IDs.remove(ID)
> > }
> >
> > defresetAccumulator(acc: IDsAcc): Unit = {
> > acc.IDs = mutable.Set()
> > }
> >
> > overridedefgetValue(acc: IDsAcc): Row = {
> > Row.of(acc.IDs.toArray)
> > }
> >
> > overridedefgetResultType: TypeInformation[Row] = {
> > newRowTypeInfo(
> > createTypeInformation[Array[Long]]
> > )
> > }
> > }
> >
> > I read the docs [2] but I don't see it really say anything about why
> > ListView is better than just using a Set or Array.
> > If we were to move from a Set to a ListView what advantages might we see
> > in these Aggregates?
> >
> > I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we
> > simply miss this feature? Does it work for 1.11.x too?
> >
> > Thanks!
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
> >
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to