Hi Arvid, The Flink application was not re-started. I had checked on that. By adding rules to the state of process function you mean the state which is local to the keyedprocess function? >From [1] what is being done here -
final MapState<String, List<Item>> state = getRuntimeContext().getMapState( mapStateDesc); state.put(ruleName, stored); [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html Thanks. On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise <ar...@apache.org> wrote: > Could you double-check if your Flink application was restarted between > Kafka topic was cleared and the time you saw that the rules have been lost? > > I suspect that you deleted the Kafka topic and the Flink application then > failed and restarted. Upon restart it read the empty rule topic. > > To solve it, you probably want to add the rules to the state of your > process function [1]. If you have done that, I'm a bit lost. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > > On Wed, Feb 24, 2021 at 7:30 AM bat man <tintin0...@gmail.com> wrote: > >> Hi, >> >> This is my code below - >> As mentioned earlier the rulesStream us again used in later processing. >> Below you can see the rulesStream is again connected with the result stream >> of the first process stream. Do you think this is the reason rules >> operators state getting overridden when the data in kafka is deleted? >> My question is if the data is not present in kafka then no data is read >> in stream how it is updating the existing state data. >> >> public static final MapStateDescriptor<Integer, Rule> rulesDescriptor = >> new MapStateDescriptor<>( >> "rules", BasicTypeInfo.INT_TYPE_INFO, >> TypeInformation.of(Rule.class)); >> >> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC)); >> DataStream<RawEvent> rawEventStream = >> validateData(getRawEventStream(rawEventKafkaSource,env)); >> >> rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC)); >> DataStream<Rule> rulesDataStream = getRulesStream(rulesKafkaSource,env); >> >> deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC)); >> DataStream<Device> deviceDataStream = getDeviceStream(deviceSource,env); >> >> BroadcastStream<Rule> rulesStream = >> rulesDataStream.broadcast(rulesDescriptor); >> >> SingleOutputStreamOperator<Keyed<RawEvent, String, Integer>> >> keyedSingleOutputStream = >> rawEventStream. >> connect(rulesStream). >> process(new >> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5); >> >> SingleOutputStreamOperator<RTEvent> rtEventDataStream = >> keyedSingleOutputStream. >> keyBy((keyed) -> keyed.getKey()). >> connect(rulesStream). >> process(new >> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5); >> >> >> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman < >> khachatryan.ro...@gmail.com> wrote: >> >>> Hi, >>> >>> Deletion of messages in Kafka shouldn't affect Flink state in general. >>> Probably, some operator in your pipeline is re-reading the topic >>> and overwrites the state, dropping what was deleted by Kafka. >>> Could you share the code? >>> >>> Regards, >>> Roman >>> >>> >>> On Tue, Feb 23, 2021 at 7:12 AM bat man <tintin0...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have 2 streams one event data and the other rules. I broadcast the >>>> rules stream and then key the data stream on event type. The connected >>>> stream is processed thereafter. >>>> We faced an issue where the rules data in the topic got deleted because >>>> of Kafka retention policy. >>>> Post this the existing rules data also got dropped in the broadcast >>>> state and the processing stopped. >>>> >>>> As per my understanding the rules which were present in broadcast state >>>> should still exist even if the data was deleted in Kafka as the rules dats >>>> was already processed and stored in state map. >>>> >>>> PS: I’m reusing the rules stream as broadcast later in processing as >>>> well. Could this be an issue? >>>> >>>> Thanks, >>>> Hemant >>>> >>>