Hi Arvid,

The Flink application was not re-started. I had checked on that.
By adding rules to the state of process function you mean the state which
is local to the keyedprocess function?
>From [1] what is being done here -

final MapState<String, List<Item>> state = getRuntimeContext().getMapState(
mapStateDesc);

state.put(ruleName, stored);


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Thanks.


On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise <ar...@apache.org> wrote:

> Could you double-check if your Flink application was restarted between
> Kafka topic was cleared and the time you saw that the rules have been lost?
>
> I suspect that you deleted the Kafka topic and the Flink application then
> failed and restarted. Upon restart it read the empty rule topic.
>
> To solve it, you probably want to add the rules to the state of your
> process function [1]. If you have done that, I'm a bit lost.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>
> On Wed, Feb 24, 2021 at 7:30 AM bat man <tintin0...@gmail.com> wrote:
>
>> Hi,
>>
>> This is my code below -
>> As mentioned earlier the rulesStream us again used in later processing.
>> Below you can see the rulesStream is again connected with the result stream
>> of the first process stream. Do you think this is the reason rules
>> operators state getting overridden when the data in kafka is deleted?
>> My question is if the data is not present in kafka then no data is read
>> in stream how it is updating the existing state data.
>>
>> public static final MapStateDescriptor<Integer, Rule> rulesDescriptor =
>>         new MapStateDescriptor<>(
>>                 "rules", BasicTypeInfo.INT_TYPE_INFO, 
>> TypeInformation.of(Rule.class));
>>
>> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
>> DataStream<RawEvent> rawEventStream = 
>> validateData(getRawEventStream(rawEventKafkaSource,env));
>>
>>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>>  DataStream<Rule> rulesDataStream = getRulesStream(rulesKafkaSource,env);
>>
>>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>>  DataStream<Device> deviceDataStream = getDeviceStream(deviceSource,env);
>>
>>  BroadcastStream<Rule> rulesStream = 
>> rulesDataStream.broadcast(rulesDescriptor);
>>
>>  SingleOutputStreamOperator<Keyed<RawEvent, String, Integer>> 
>> keyedSingleOutputStream =
>>          rawEventStream.
>>                  connect(rulesStream).
>>                  process(new 
>> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>>
>>  SingleOutputStreamOperator<RTEvent> rtEventDataStream =
>>          keyedSingleOutputStream.
>>                  keyBy((keyed) -> keyed.getKey()).
>>                  connect(rulesStream).
>>                  process(new 
>> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);
>>
>>
>> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Deletion of messages in Kafka shouldn't affect Flink state in general.
>>> Probably, some operator in your pipeline is re-reading the topic
>>> and overwrites the state, dropping what was deleted by Kafka.
>>> Could you share the code?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Tue, Feb 23, 2021 at 7:12 AM bat man <tintin0...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have 2 streams one event data and the other rules. I broadcast the
>>>> rules stream and then key the data stream on event type. The connected
>>>> stream is processed thereafter.
>>>> We faced an issue where the rules data in the topic got deleted because
>>>> of Kafka retention policy.
>>>> Post this the existing rules data also got dropped in the broadcast
>>>> state and the processing stopped.
>>>>
>>>> As per my understanding the rules which were present in broadcast state
>>>> should still exist even if the data was deleted in Kafka as the rules dats
>>>> was already processed and stored in state map.
>>>>
>>>> PS: I’m reusing the rules stream as broadcast later in processing as
>>>> well. Could this be an issue?
>>>>
>>>> Thanks,
>>>> Hemant
>>>>
>>>

Reply via email to