Hi again,

It seems the temporary solution I mentioned in last mail doesn't work
steadily. I am wondering that whether all states should be accessed in
`process_element1`, `process_element2` and `on_timer` (if a timer is
registered) or not ?

Or is there any other suggested workaround?

Thanks.

Fei Zhao <emanonc...@gmail.com> 于2021年7月13日周二 下午5:21写道:

> Hi,
>
> Thanks for your explanation! Adding a line `self.data.contains('xxx')` in
> the `process_element2` and all goes well. I will take this as my temporary
> solution.
>
> Looking forward to the next release.
>
> Best Regards,
> Fei
>
>
> Xingbo Huang <hxbks...@gmail.com> 于2021年7月13日周二 下午4:18写道:
>
>> Hi,
>>
>> I have created the JIRA[1] to fix this bug which will be included in
>> release-1.13.2. The root cause is the wrong mapping of the state key to the
>> state. This kind of wrong mapping occurs when the key is switched, but the
>> state is not used. As you wrote in the example, the `data` you declared is
>> not used in `process_element2`
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23368
>>
>> Best,
>> Xingbo
>>
>> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午10:00写道:
>>
>>> Thanks. In addition, I run the program in a local mini cluster mode, not
>>> sure if it would affect the results.
>>>
>>> Xingbo Huang <hxbks...@gmail.com> 于2021年7月12日周一 下午9:02写道:
>>>
>>>> Hi,
>>>>
>>>> I think your understanding is correct. The results seem a little wired.
>>>> I'm looking into this and will let you know when there are any findings.
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道:
>>>>
>>>>> Hi all,
>>>>> I'm using pyflink to develop a module, whose main functionality is
>>>>> processing user data based on specific rules. The program involves two
>>>>> datastreams: data and rule. They have different types, so I connect them
>>>>> and use a field 'product_id' as the key for key_by method. The code is as
>>>>> follows (just demo codes, not the actual one):
>>>>>
>>>>> import random
>>>>>
>>>>> from pyflink.common.typeinfo import Types
>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>> from pyflink.datastream.functions import KeyedCoProcessFunction
>>>>> from pyflink.datastream.state import MapStateDescriptor
>>>>> from pyflink.datastream import RuntimeContext
>>>>>
>>>>>
>>>>> def test(data):
>>>>>     product_ids = set()
>>>>>     for key, value in data.items():
>>>>>         product_ids.add(value[0])
>>>>>     return list(product_ids)
>>>>>
>>>>>
>>>>> class MyFunction(KeyedCoProcessFunction):
>>>>>     def open(self, ctx):
>>>>>         data_desc = MapStateDescriptor('data', Types.STRING(), 
>>>>> Types.ROW([Types.INT()]))
>>>>>         self.data = ctx.get_map_state(data_desc)
>>>>>
>>>>>         rule_desc = MapStateDescriptor('rule', Types.STRING(), 
>>>>> Types.ROW([Types.INT()]))
>>>>>         self.rules = ctx.get_map_state(rule_desc)
>>>>>
>>>>>     def process_element1(self, data_value, ctx):
>>>>>         row_id, others = data_value[0], data_value[1:]
>>>>>         self.data.put(row_id, others)
>>>>>         result = []
>>>>>         for key, value_list in self.rules.items():
>>>>>             product_id, random_0, random_1  = value_list
>>>>>             # Do some calculations
>>>>>             product_ids_of_state_data = test(self.data)
>>>>>             result.append([random_0, random_1, product_id, 
>>>>> product_ids_of_state_data])
>>>>>         return result
>>>>>
>>>>>     def process_element2(self, rule_value, ctx):
>>>>>         row_id, others = rule_value[0], rule_value[1:]
>>>>>         self.rules.put(row_id, others)
>>>>>
>>>>> def generate_data1(count):
>>>>>     collection = []
>>>>>     for i in range(count):
>>>>>         collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
>>>>>     return collection
>>>>>
>>>>> def generate_data2(count):
>>>>>     collection = []
>>>>>     for i in range(count):
>>>>>         collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % 
>>>>> i, i * 2])
>>>>>     return collection
>>>>>
>>>>>
>>>>> def main():
>>>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>>>     env.set_parallelism(1)
>>>>>
>>>>>     data = env.from_collection(generate_data1(50))
>>>>>     rules = env.from_collection([
>>>>>         ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>>>>>         ['row_1', 2, 'rule2_value0', 'rule2_value1']
>>>>>     ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
>>>>> Types.STRING()]))
>>>>>     results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
>>>>> y[1]).process(MyFunction())
>>>>>     results.print()
>>>>>
>>>>>     env.execute("test_job")
>>>>>
>>>>> if __name__ == "__main__":
>>>>>     main()
>>>>>
>>>>>
>>>>> When processing the first datastream, which contains user data, I will
>>>>> access the registered MapState and get the unique product_id in it.
>>>>> According to the description on the official site:
>>>>>
>>>>> Keyed state is maintained in what can be thought of as an embedded
>>>>>> key/value store. The state is partitioned and distributed strictly 
>>>>>> together
>>>>>> with the streams that are read by the stateful operators. Hence, access 
>>>>>> to
>>>>>> the key/value state is only possible on *keyed streams*, i.e. after
>>>>>> a keyed/partitioned data exchange, and *is restricted to the values
>>>>>> associated with the current event’s key*
>>>>>
>>>>>
>>>>> I assume that each time after accessing the MapState, only one
>>>>> product_id value should be got. But I get the following outputs after
>>>>> running the script:
>>>>>
>>>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>>>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>>>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>>>>>
>>>>>
>>>>> It shows that each user data does be processed based on the according
>>>>> rules, but there has other product's data (the last element of each row) 
>>>>> in
>>>>> the MapState. And there are other strange points:
>>>>> 1. I have two functions to generate data. When using data generated by
>>>>> generate_data2, more likely the above problem occurs, compared to
>>>>> generate_data1.
>>>>> 2. When the data size is more than 50, more likely the above problem
>>>>> occurs.
>>>>> 3. Same codes and same data, different outputs would be got after
>>>>> running multiple times. Sometimes the output is as expected, sometimes are
>>>>> not.
>>>>>
>>>>> Does anybody know why? Or my understanding of keyed-state is wrong?
>>>>>
>>>>> Thanks.
>>>>>
>>>>>

Reply via email to