Hi,

Thanks for your explanation! Adding a line `self.data.contains('xxx')` in
the `process_element2` and all goes well. I will take this as my temporary
solution.

Looking forward to the next release.

Best Regards,
Fei


Xingbo Huang <hxbks...@gmail.com> 于2021年7月13日周二 下午4:18写道:

> Hi,
>
> I have created the JIRA[1] to fix this bug which will be included in
> release-1.13.2. The root cause is the wrong mapping of the state key to the
> state. This kind of wrong mapping occurs when the key is switched, but the
> state is not used. As you wrote in the example, the `data` you declared is
> not used in `process_element2`
>
> [1] https://issues.apache.org/jira/browse/FLINK-23368
>
> Best,
> Xingbo
>
> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午10:00写道:
>
>> Thanks. In addition, I run the program in a local mini cluster mode, not
>> sure if it would affect the results.
>>
>> Xingbo Huang <hxbks...@gmail.com> 于2021年7月12日周一 下午9:02写道:
>>
>>> Hi,
>>>
>>> I think your understanding is correct. The results seem a little wired.
>>> I'm looking into this and will let you know when there are any findings.
>>>
>>> Best,
>>> Xingbo
>>>
>>> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道:
>>>
>>>> Hi all,
>>>> I'm using pyflink to develop a module, whose main functionality is
>>>> processing user data based on specific rules. The program involves two
>>>> datastreams: data and rule. They have different types, so I connect them
>>>> and use a field 'product_id' as the key for key_by method. The code is as
>>>> follows (just demo codes, not the actual one):
>>>>
>>>> import random
>>>>
>>>> from pyflink.common.typeinfo import Types
>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>> from pyflink.datastream.functions import KeyedCoProcessFunction
>>>> from pyflink.datastream.state import MapStateDescriptor
>>>> from pyflink.datastream import RuntimeContext
>>>>
>>>>
>>>> def test(data):
>>>>     product_ids = set()
>>>>     for key, value in data.items():
>>>>         product_ids.add(value[0])
>>>>     return list(product_ids)
>>>>
>>>>
>>>> class MyFunction(KeyedCoProcessFunction):
>>>>     def open(self, ctx):
>>>>         data_desc = MapStateDescriptor('data', Types.STRING(), 
>>>> Types.ROW([Types.INT()]))
>>>>         self.data = ctx.get_map_state(data_desc)
>>>>
>>>>         rule_desc = MapStateDescriptor('rule', Types.STRING(), 
>>>> Types.ROW([Types.INT()]))
>>>>         self.rules = ctx.get_map_state(rule_desc)
>>>>
>>>>     def process_element1(self, data_value, ctx):
>>>>         row_id, others = data_value[0], data_value[1:]
>>>>         self.data.put(row_id, others)
>>>>         result = []
>>>>         for key, value_list in self.rules.items():
>>>>             product_id, random_0, random_1  = value_list
>>>>             # Do some calculations
>>>>             product_ids_of_state_data = test(self.data)
>>>>             result.append([random_0, random_1, product_id, 
>>>> product_ids_of_state_data])
>>>>         return result
>>>>
>>>>     def process_element2(self, rule_value, ctx):
>>>>         row_id, others = rule_value[0], rule_value[1:]
>>>>         self.rules.put(row_id, others)
>>>>
>>>> def generate_data1(count):
>>>>     collection = []
>>>>     for i in range(count):
>>>>         collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
>>>>     return collection
>>>>
>>>> def generate_data2(count):
>>>>     collection = []
>>>>     for i in range(count):
>>>>         collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % 
>>>> i, i * 2])
>>>>     return collection
>>>>
>>>>
>>>> def main():
>>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>>     env.set_parallelism(1)
>>>>
>>>>     data = env.from_collection(generate_data1(50))
>>>>     rules = env.from_collection([
>>>>         ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>>>>         ['row_1', 2, 'rule2_value0', 'rule2_value1']
>>>>     ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
>>>> Types.STRING()]))
>>>>     results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
>>>> y[1]).process(MyFunction())
>>>>     results.print()
>>>>
>>>>     env.execute("test_job")
>>>>
>>>> if __name__ == "__main__":
>>>>     main()
>>>>
>>>>
>>>> When processing the first datastream, which contains user data, I will
>>>> access the registered MapState and get the unique product_id in it.
>>>> According to the description on the official site:
>>>>
>>>> Keyed state is maintained in what can be thought of as an embedded
>>>>> key/value store. The state is partitioned and distributed strictly 
>>>>> together
>>>>> with the streams that are read by the stateful operators. Hence, access to
>>>>> the key/value state is only possible on *keyed streams*, i.e. after a
>>>>> keyed/partitioned data exchange, and *is restricted to the values
>>>>> associated with the current event’s key*
>>>>
>>>>
>>>> I assume that each time after accessing the MapState, only one
>>>> product_id value should be got. But I get the following outputs after
>>>> running the script:
>>>>
>>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>>>>
>>>>
>>>> It shows that each user data does be processed based on the according
>>>> rules, but there has other product's data (the last element of each row) in
>>>> the MapState. And there are other strange points:
>>>> 1. I have two functions to generate data. When using data generated by
>>>> generate_data2, more likely the above problem occurs, compared to
>>>> generate_data1.
>>>> 2. When the data size is more than 50, more likely the above problem
>>>> occurs.
>>>> 3. Same codes and same data, different outputs would be got after
>>>> running multiple times. Sometimes the output is as expected, sometimes are
>>>> not.
>>>>
>>>> Does anybody know why? Or my understanding of keyed-state is wrong?
>>>>
>>>> Thanks.
>>>>
>>>>

Reply via email to