Hello,
I am running a Flink stateful job, where the checkpoint size increases 
continuously over time (200+ MB). The actual State size should be in < 1 MB.  
The source is a Kafka Topic.  The number of keys in the Topic is < 1000 
(confirmed by inspecting the Topic).  Each Key needs to store a set of values 
that changes continuously (via addition and deletion of entries to the set).  
The number of values to be stored for each key is always less than 5.
I am using State type - ListState to store the values for a key.
I feel the reason for the continuous increase of the checkpoint size is due to 
one of the following reasons:* The old entries (when I do an `update()`) is not 
getting purged from the state and retained.  OR* Somehow there are a lot of 
keys being added to the store.  (Not sure how this is even possible.).
Any ideas or pointer to debug this issue would be greatly appreciated.Related 
question, do I need to set some sort of TTL for the ListState?

 Following is snippet of the code:
// Main method    DataStream<Message> messageStream =            
env.addSource(kafkaFactory.newConsumer(parameters.getRequired(INPUT_TOPIC), 
schema))                  .uid("kafka-source")
    SinkFunction<Message> kafkaSink = kafkaFactory.newProducer(                 
   parameters.getRequired(OUTPUT_TOPIC), schema);
    DataStreamUtils.reinterpretAsKeyedStream(messageStream, 
FabricEnvelope::getAssetId)            .process(new TripMapper(parameters))     
       .uid("trip")            .addSink(kafkaSink)            
.uid("kafka-sink");

// Mapper class    public class TripMapper extends            
KeyedProcessFunction<String, Message, Message> {        private transient 
ListState<Trip> trips;
        @Override        public void open(Configuration parameters) {           
 ListStateDescriptor<Trip> tripDescriptor =                    new 
ListStateDescriptor<>("trips", Trip.class);            trips = 
getRuntimeContext().getListState(tripDescriptor);            ...        }
        @Override        public void processElement(Message message, Context 
ctx,                                   Collector<Message> out) throws Exception 
{
            List<Trip> tripList = new ArrayList<>();            
trips.get().forEach(tripList::add);

            // Depending on the message content, one of the following actions 
are taken on the state:            // - Add new entry to tripList            // 
- Remove existing entry from tripList
            // Update state            trips.update(tripList);        }
Thank you in advance,Ahmed.

Reply via email to