Hi Dawid, Thanks for following up on this.
Let me know if you’d like me to update the documentation, that seems pretty straightforward :) Adding support for compressing broadcast state feels like something more challenging, but I could take a swing at it if you’d like. Regards, — Ken PS - re the key serializer, I was looking at a hack where I keep around the previous record so I could do delta encoding…but that’s also fragile. > On Nov 21, 2022, at 1:36 AM, Dawid Wysakowicz <[email protected]> wrote: > > And yes, I read "Compression works on the granularity of key-groups in keyed > state” as meaning “When compressing keyed state, it’s done per key-group” and > not “Compression only works on keyed state” :) > > Totally agree. Docs could definitely be more straightforward about that. I > created FLINK-30112[1] to improve that. > > I agree that "KeyedState should be preferred in majority of cases”. > Unfortunately for a broadcast stream there’s option to used keyed state, > right? > > Absolutely, I mentioned this just an excuse why we did not invest much in > OperatorState. > > So assuming that’s the current situation, and I’m not in a position to have > my client deploy a patched version of Flink, which of the following (sketchy) > ideas has any potential here… > > I am afraid it won't be easy to hack the compression in. > > Your suggestion 2. won't help imo, as the compression would compress > separately for each entry. I believe that's not what you're looking for. > > The option 1. has more potential, as it would apply compression for the > entire state. As you said though, this would require class overloading which > is always fragile. > > To be honest, I can't think of a better way atm. > > Btw, I believe being able to apply compression for operator state is a valid > request so I created FLINK-30113[2] > > [1] https://issues.apache.org/jira/browse/FLINK-30112 > <https://issues.apache.org/jira/browse/FLINK-30112> > [2] https://issues.apache.org/jira/browse/FLINK-30113 > <https://issues.apache.org/jira/browse/FLINK-30113> > > On 18/11/2022 02:13, Ken Krugler wrote: >> Hi Dawid, >> >> Thanks for getting back to me. >> >> And yes, I read "Compression works on the granularity of key-groups in keyed >> state” as meaning “When compressing keyed state, it’s done per key-group” >> and not “Compression only works on keyed state” :) >> >> I agree that "KeyedState should be preferred in majority of cases”. >> Unfortunately for a broadcast stream there’s option to used keyed state, >> right? >> >> So assuming that’s the current situation, and I’m not in a position to have >> my client deploy a patched version of Flink, which of the following >> (sketchy) ideas has any potential here… >> >> 1. Implement a version of HeapBroadcastState that compresses the state, and >> rely on Flink’s classloader finding it in my jar first. >> >> 2. Register a custom compressing serializer for my state’s key class, >> assuming that will get picked up by the call to >> stateMetaInfo.getKeySerializer(). >> >> Or something else? >> >> Thanks! >> >> — Ken >> >> >>> On Nov 17, 2022, at 12:06 AM, Dawid Wysakowicz <[email protected]> >>> <mailto:[email protected]> wrote: >>> >>> Cross posting answer from SO: >>> >>> BroadcastState is an operator state not a KeyedState. The referenced docs >>> refer to a KeyedState: >>> >>> Compression works on the granularity of key-groups in keyed state, >>> >>> Probably docs could be more explicit about this behaviour. >>> >>> Unfortunately as far as I know there is no compression for OperatorState. I >>> am not 100% sure, but I believe it's just has never been implemented, >>> because we did not want to invest in it, as KeyedState should be preferred >>> in majority of cases. >>> >>> Best, >>> >>> Dawid >>> >>> On 16/11/2022 23:27, Ken Krugler wrote: >>>> Hi all, >>>> >>>> Just posted this question on SO: How to enable compression for Flink >>>> broadcast state checkpoints >>>> <https://stackoverflow.com/q/74466988/231762?sem=2> >>>> <https://stackoverflow.com/q/74466988/231762?sem=2> >>>> Basically it doesn’t look like broadcast state respects the compressed >>>> state (checkpoints/savepoints) setting, but I might be reading the code >>>> wrong. >>>> >>>> Hoping someone (like Dawid Wysakowicz) can chime in here, thanks! >>>> >>>> — Ken >>>> >>>> -------------------------- >>>> Ken Krugler >>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>> <http://www.scaleunlimited.com/> <http://www.scaleunlimited.com/> >>>> Custom big data solutions >>>> Flink, Pinot, Solr, Elasticsearch >>>> >>> <OpenPGP_0x31D2DD10BFC15A2D.asc> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions >> Flink, Pinot, Solr, Elasticsearch >> >> >> >> > <OpenPGP_0x31D2DD10BFC15A2D.asc> -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch
