Hello, I am running a Flink stateful job, where the checkpoint size increases continuously over time (200+ MB). The actual State size should be in < 1 MB. The source is a Kafka Topic. The number of keys in the Topic is < 1000 (confirmed by inspecting the Topic). Each Key needs to store a set of values that changes continuously (via addition and deletion of entries to the set). The number of values to be stored for each key is always less than 5. I am using State type - ListState to store the values for a key. I feel the reason for the continuous increase of the checkpoint size is due to one of the following reasons:* The old entries (when I do an `update()`) is not getting purged from the state and retained. OR* Somehow there are a lot of keys being added to the store. (Not sure how this is even possible.). Any ideas or pointer to debug this issue would be greatly appreciated.Related question, do I need to set some sort of TTL for the ListState?
Following is snippet of the code: // Main method DataStream<Message> messageStream = env.addSource(kafkaFactory.newConsumer(parameters.getRequired(INPUT_TOPIC), schema)) .uid("kafka-source") SinkFunction<Message> kafkaSink = kafkaFactory.newProducer( parameters.getRequired(OUTPUT_TOPIC), schema); DataStreamUtils.reinterpretAsKeyedStream(messageStream, FabricEnvelope::getAssetId) .process(new TripMapper(parameters)) .uid("trip") .addSink(kafkaSink) .uid("kafka-sink"); // Mapper class public class TripMapper extends KeyedProcessFunction<String, Message, Message> { private transient ListState<Trip> trips; @Override public void open(Configuration parameters) { ListStateDescriptor<Trip> tripDescriptor = new ListStateDescriptor<>("trips", Trip.class); trips = getRuntimeContext().getListState(tripDescriptor); ... } @Override public void processElement(Message message, Context ctx, Collector<Message> out) throws Exception { List<Trip> tripList = new ArrayList<>(); trips.get().forEach(tripList::add); // Depending on the message content, one of the following actions are taken on the state: // - Add new entry to tripList // - Remove existing entry from tripList // Update state trips.update(tripList); } Thank you in advance,Ahmed.