Hey. Let me send simplified example, because I don't think this
"(given that the actual stored objects (integers) are the same)" is
true - I'm just storing object as a placeholder:

public class DeduplicationProcessFunction<K, IN> extends
KeyedProcessFunction<K, IN, IN> implements CheckpointedFunction {

    private transient ValueState<Object> processedState;

    public DeduplicationProcessFunction() { }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws
Exception { }

    @Override
    public void initializeState(FunctionInitializationContext context)
throws Exception {
        val descriptor = new ValueStateDescriptor<>("processed",
            TypeInformation.of(Object.class));
        processedState = context.getKeyedStateStore().getState(descriptor);
    }

    @Override
    public void processElement(IN value, Context ctx, Collector<IN>
out) throws Exception {
        val processed = processedState.value();
        if (processed == null) {
            processedState.update(new Object());
            out.collect(value);
        }
    }
}



Basically, I'm not sure what rocksdb stores in this case. I'm sure
that it needs to store key, which is 32byte sha key in this case.
What's the value? Is it the 16 bytes that Java requires in-memory? If
I'll change my ValueState to integer, and provide additional value
there, will it require more storage space? Also, to respond to your
point about compression, we're using incremental checkpoints, so I
don't think anything will change as per docs. I'm not only interested
in snapshot size, but also size of current, in memory and local disk
state.

Thanks,
Maciej



wt., 23 lut 2021 o 17:53 Roman Khachatryan <ro...@apache.org> napisaƂ(a):
>
> Hi Maciej,
>
> If I understand correctly, you're asking whether ValueState parameterized 
> with Object has the same size as the one with Integer (given that the actual 
> stored objects (integers) are the same).
> With RocksDB, any state object is serialized first and only then it is stored 
> in MemTable or in an SST file. So it doesn't matter as long as the same 
> serializer is used.
>
> You probably should try enabling compression if you didn't already: 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski 
> <obuchowski.mac...@gmail.com> wrote:
>>
>> Hey.
>>
>> We have deduplication job that has a large amount of keyed ValueState. We 
>> want to decrease state size as much as possible, so we're using 
>> ValueState<Object> as it's smallest possible Java non-primitive. However, as 
>> per https://www.baeldung.com/java-size-of-object (and my measurements) Java 
>> Integer has the same memory size as Object due to padding.
>> Will this still be true with RocksDB state? Can we put Integer in state 
>> without increasing state size?
>>
>> Thanks, Maciej

Reply via email to