Thanks Roman, that's exactly what I needed.
śr., 24 lut 2021 o 14:37 Roman Khachatryan <[email protected]> napisał(a): > > Thanks for the clarification. > > RocksDB stores whatever value Flink passes to it after serialization. > The value is passed as an array of bytes so the minimum is single byte. > Integer would require 4 bytes, Object - 1 or 2 depending on the serializer > (Pojo or Kryo), and boolean just 1 byte. > Besides that, boolean serialization is apparently faster. > > Sizes in memory, on disk and of snapshot are all affected proportionally. > > You are right regarding Flink compression settings will not have any impact > with incremental checkpoints. > > Regards, > Roman > > > On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski > <[email protected]> wrote: >> >> Hey. Let me send simplified example, because I don't think this >> "(given that the actual stored objects (integers) are the same)" is >> true - I'm just storing object as a placeholder: >> >> public class DeduplicationProcessFunction<K, IN> extends >> KeyedProcessFunction<K, IN, IN> implements CheckpointedFunction { >> >> private transient ValueState<Object> processedState; >> >> public DeduplicationProcessFunction() { } >> >> @Override >> public void snapshotState(FunctionSnapshotContext context) throws >> Exception { } >> >> @Override >> public void initializeState(FunctionInitializationContext context) >> throws Exception { >> val descriptor = new ValueStateDescriptor<>("processed", >> TypeInformation.of(Object.class)); >> processedState = context.getKeyedStateStore().getState(descriptor); >> } >> >> @Override >> public void processElement(IN value, Context ctx, Collector<IN> >> out) throws Exception { >> val processed = processedState.value(); >> if (processed == null) { >> processedState.update(new Object()); >> out.collect(value); >> } >> } >> } >> >> >> >> Basically, I'm not sure what rocksdb stores in this case. I'm sure >> that it needs to store key, which is 32byte sha key in this case. >> What's the value? Is it the 16 bytes that Java requires in-memory? If >> I'll change my ValueState to integer, and provide additional value >> there, will it require more storage space? Also, to respond to your >> point about compression, we're using incremental checkpoints, so I >> don't think anything will change as per docs. I'm not only interested >> in snapshot size, but also size of current, in memory and local disk >> state. >> >> Thanks, >> Maciej >> >> >> >> wt., 23 lut 2021 o 17:53 Roman Khachatryan <[email protected]> napisał(a): >> > >> > Hi Maciej, >> > >> > If I understand correctly, you're asking whether ValueState parameterized >> > with Object has the same size as the one with Integer (given that the >> > actual stored objects (integers) are the same). >> > With RocksDB, any state object is serialized first and only then it is >> > stored in MemTable or in an SST file. So it doesn't matter as long as the >> > same serializer is used. >> > >> > You probably should try enabling compression if you didn't already: >> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression >> > >> > Regards, >> > Roman >> > >> > >> > On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski >> > <[email protected]> wrote: >> >> >> >> Hey. >> >> >> >> We have deduplication job that has a large amount of keyed ValueState. We >> >> want to decrease state size as much as possible, so we're using >> >> ValueState<Object> as it's smallest possible Java non-primitive. However, >> >> as per https://www.baeldung.com/java-size-of-object (and my measurements) >> >> Java Integer has the same memory size as Object due to padding. >> >> Will this still be true with RocksDB state? Can we put Integer in state >> >> without increasing state size? >> >> >> >> Thanks, Maciej
