Hi,

I have created the JIRA[1] to fix this bug which will be included in
release-1.13.2. The root cause is the wrong mapping of the state key to the
state. This kind of wrong mapping occurs when the key is switched, but the
state is not used. As you wrote in the example, the `data` you declared is
not used in `process_element2`

[1] https://issues.apache.org/jira/browse/FLINK-23368

Best,
Xingbo

赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午10:00写道:

> Thanks. In addition, I run the program in a local mini cluster mode, not
> sure if it would affect the results.
>
> Xingbo Huang <hxbks...@gmail.com> 于2021年7月12日周一 下午9:02写道:
>
>> Hi,
>>
>> I think your understanding is correct. The results seem a little wired.
>> I'm looking into this and will let you know when there are any findings.
>>
>> Best,
>> Xingbo
>>
>> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道:
>>
>>> Hi all,
>>> I'm using pyflink to develop a module, whose main functionality is
>>> processing user data based on specific rules. The program involves two
>>> datastreams: data and rule. They have different types, so I connect them
>>> and use a field 'product_id' as the key for key_by method. The code is as
>>> follows (just demo codes, not the actual one):
>>>
>>> import random
>>>
>>> from pyflink.common.typeinfo import Types
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.datastream.functions import KeyedCoProcessFunction
>>> from pyflink.datastream.state import MapStateDescriptor
>>> from pyflink.datastream import RuntimeContext
>>>
>>>
>>> def test(data):
>>>     product_ids = set()
>>>     for key, value in data.items():
>>>         product_ids.add(value[0])
>>>     return list(product_ids)
>>>
>>>
>>> class MyFunction(KeyedCoProcessFunction):
>>>     def open(self, ctx):
>>>         data_desc = MapStateDescriptor('data', Types.STRING(), 
>>> Types.ROW([Types.INT()]))
>>>         self.data = ctx.get_map_state(data_desc)
>>>
>>>         rule_desc = MapStateDescriptor('rule', Types.STRING(), 
>>> Types.ROW([Types.INT()]))
>>>         self.rules = ctx.get_map_state(rule_desc)
>>>
>>>     def process_element1(self, data_value, ctx):
>>>         row_id, others = data_value[0], data_value[1:]
>>>         self.data.put(row_id, others)
>>>         result = []
>>>         for key, value_list in self.rules.items():
>>>             product_id, random_0, random_1  = value_list
>>>             # Do some calculations
>>>             product_ids_of_state_data = test(self.data)
>>>             result.append([random_0, random_1, product_id, 
>>> product_ids_of_state_data])
>>>         return result
>>>
>>>     def process_element2(self, rule_value, ctx):
>>>         row_id, others = rule_value[0], rule_value[1:]
>>>         self.rules.put(row_id, others)
>>>
>>> def generate_data1(count):
>>>     collection = []
>>>     for i in range(count):
>>>         collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
>>>     return collection
>>>
>>> def generate_data2(count):
>>>     collection = []
>>>     for i in range(count):
>>>         collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, 
>>> i * 2])
>>>     return collection
>>>
>>>
>>> def main():
>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>     env.set_parallelism(1)
>>>
>>>     data = env.from_collection(generate_data1(50))
>>>     rules = env.from_collection([
>>>         ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>>>         ['row_1', 2, 'rule2_value0', 'rule2_value1']
>>>     ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
>>> Types.STRING()]))
>>>     results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
>>> y[1]).process(MyFunction())
>>>     results.print()
>>>
>>>     env.execute("test_job")
>>>
>>> if __name__ == "__main__":
>>>     main()
>>>
>>>
>>> When processing the first datastream, which contains user data, I will
>>> access the registered MapState and get the unique product_id in it.
>>> According to the description on the official site:
>>>
>>> Keyed state is maintained in what can be thought of as an embedded
>>>> key/value store. The state is partitioned and distributed strictly together
>>>> with the streams that are read by the stateful operators. Hence, access to
>>>> the key/value state is only possible on *keyed streams*, i.e. after a
>>>> keyed/partitioned data exchange, and *is restricted to the values
>>>> associated with the current event’s key*
>>>
>>>
>>> I assume that each time after accessing the MapState, only one
>>> product_id value should be got. But I get the following outputs after
>>> running the script:
>>>
>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>>>
>>>
>>> It shows that each user data does be processed based on the according
>>> rules, but there has other product's data (the last element of each row) in
>>> the MapState. And there are other strange points:
>>> 1. I have two functions to generate data. When using data generated by
>>> generate_data2, more likely the above problem occurs, compared to
>>> generate_data1.
>>> 2. When the data size is more than 50, more likely the above problem
>>> occurs.
>>> 3. Same codes and same data, different outputs would be got after
>>> running multiple times. Sometimes the output is as expected, sometimes are
>>> not.
>>>
>>> Does anybody know why? Or my understanding of keyed-state is wrong?
>>>
>>> Thanks.
>>>
>>>

Reply via email to