I created a new descriptor and rulestream used it in the second process
function and this works fine.
public static final MapStateDescriptor rulesDescriptor =
new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));
public static fin
Hi,
I have no idea what's going on. There is no mechanism in DataStream to
react to deleted records.
Can you reproduce it locally and debug through it?
On Wed, Feb 24, 2021 at 5:21 PM bat man wrote:
> Hi Arvid,
>
> The Flink application was not re-started. I had checked on that.
> By adding
Hi Arvid,
The Flink application was not re-started. I had checked on that.
By adding rules to the state of process function you mean the state which
is local to the keyedprocess function?
>From [1] what is being done here -
final MapState> state = getRuntimeContext().getMapState(
mapStateDesc);
Could you double-check if your Flink application was restarted between
Kafka topic was cleared and the time you saw that the rules have been lost?
I suspect that you deleted the Kafka topic and the Flink application then
failed and restarted. Upon restart it read the empty rule topic.
To solve it
Hi,
This is my code below -
As mentioned earlier the rulesStream us again used in later processing.
Below you can see the rulesStream is again connected with the result stream
of the first process stream. Do you think this is the reason rules
operators state getting overridden when the data in kaf
Hi,
Deletion of messages in Kafka shouldn't affect Flink state in general.
Probably, some operator in your pipeline is re-reading the topic
and overwrites the state, dropping what was deleted by Kafka.
Could you share the code?
Regards,
Roman
On Tue, Feb 23, 2021 at 7:12 AM bat man wrote:
> H
Hi,
I have 2 streams one event data and the other rules. I broadcast the rules
stream and then key the data stream on event type. The connected stream is
processed thereafter.
We faced an issue where the rules data in the topic got deleted because of
Kafka retention policy.
Post this the existing