Hi, Thanks for your explanation! Adding a line `self.data.contains('xxx')` in the `process_element2` and all goes well. I will take this as my temporary solution.
Looking forward to the next release. Best Regards, Fei Xingbo Huang <hxbks...@gmail.com> 于2021年7月13日周二 下午4:18写道: > Hi, > > I have created the JIRA[1] to fix this bug which will be included in > release-1.13.2. The root cause is the wrong mapping of the state key to the > state. This kind of wrong mapping occurs when the key is switched, but the > state is not used. As you wrote in the example, the `data` you declared is > not used in `process_element2` > > [1] https://issues.apache.org/jira/browse/FLINK-23368 > > Best, > Xingbo > > 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午10:00写道: > >> Thanks. In addition, I run the program in a local mini cluster mode, not >> sure if it would affect the results. >> >> Xingbo Huang <hxbks...@gmail.com> 于2021年7月12日周一 下午9:02写道: >> >>> Hi, >>> >>> I think your understanding is correct. The results seem a little wired. >>> I'm looking into this and will let you know when there are any findings. >>> >>> Best, >>> Xingbo >>> >>> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道: >>> >>>> Hi all, >>>> I'm using pyflink to develop a module, whose main functionality is >>>> processing user data based on specific rules. The program involves two >>>> datastreams: data and rule. They have different types, so I connect them >>>> and use a field 'product_id' as the key for key_by method. The code is as >>>> follows (just demo codes, not the actual one): >>>> >>>> import random >>>> >>>> from pyflink.common.typeinfo import Types >>>> from pyflink.datastream import StreamExecutionEnvironment >>>> from pyflink.datastream.functions import KeyedCoProcessFunction >>>> from pyflink.datastream.state import MapStateDescriptor >>>> from pyflink.datastream import RuntimeContext >>>> >>>> >>>> def test(data): >>>> product_ids = set() >>>> for key, value in data.items(): >>>> product_ids.add(value[0]) >>>> return list(product_ids) >>>> >>>> >>>> class MyFunction(KeyedCoProcessFunction): >>>> def open(self, ctx): >>>> data_desc = MapStateDescriptor('data', Types.STRING(), >>>> Types.ROW([Types.INT()])) >>>> self.data = ctx.get_map_state(data_desc) >>>> >>>> rule_desc = MapStateDescriptor('rule', Types.STRING(), >>>> Types.ROW([Types.INT()])) >>>> self.rules = ctx.get_map_state(rule_desc) >>>> >>>> def process_element1(self, data_value, ctx): >>>> row_id, others = data_value[0], data_value[1:] >>>> self.data.put(row_id, others) >>>> result = [] >>>> for key, value_list in self.rules.items(): >>>> product_id, random_0, random_1 = value_list >>>> # Do some calculations >>>> product_ids_of_state_data = test(self.data) >>>> result.append([random_0, random_1, product_id, >>>> product_ids_of_state_data]) >>>> return result >>>> >>>> def process_element2(self, rule_value, ctx): >>>> row_id, others = rule_value[0], rule_value[1:] >>>> self.rules.put(row_id, others) >>>> >>>> def generate_data1(count): >>>> collection = [] >>>> for i in range(count): >>>> collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2]) >>>> return collection >>>> >>>> def generate_data2(count): >>>> collection = [] >>>> for i in range(count): >>>> collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % >>>> i, i * 2]) >>>> return collection >>>> >>>> >>>> def main(): >>>> env = StreamExecutionEnvironment.get_execution_environment() >>>> env.set_parallelism(1) >>>> >>>> data = env.from_collection(generate_data1(50)) >>>> rules = env.from_collection([ >>>> ['row_0', 1, 'rule1_value0', 'rule1_value1'], >>>> ['row_1', 2, 'rule2_value0', 'rule2_value1'] >>>> ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), >>>> Types.STRING()])) >>>> results = data.connect(rules).key_by(lambda x: x[1], lambda y: >>>> y[1]).process(MyFunction()) >>>> results.print() >>>> >>>> env.execute("test_job") >>>> >>>> if __name__ == "__main__": >>>> main() >>>> >>>> >>>> When processing the first datastream, which contains user data, I will >>>> access the registered MapState and get the unique product_id in it. >>>> According to the description on the official site: >>>> >>>> Keyed state is maintained in what can be thought of as an embedded >>>>> key/value store. The state is partitioned and distributed strictly >>>>> together >>>>> with the streams that are read by the stateful operators. Hence, access to >>>>> the key/value state is only possible on *keyed streams*, i.e. after a >>>>> keyed/partitioned data exchange, and *is restricted to the values >>>>> associated with the current event’s key* >>>> >>>> >>>> I assume that each time after accessing the MapState, only one >>>> product_id value should be got. But I get the following outputs after >>>> running the script: >>>> >>>> ['rule1_value0', 'rule1_value1', 1, [1]] >>>>> ['rule1_value0', 'rule1_value1', 1, [1]] >>>>> ['rule1_value0', 'rule1_value1', 1, [1]] >>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]] >>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]] >>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]] >>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]] >>>>> >>>> >>>> It shows that each user data does be processed based on the according >>>> rules, but there has other product's data (the last element of each row) in >>>> the MapState. And there are other strange points: >>>> 1. I have two functions to generate data. When using data generated by >>>> generate_data2, more likely the above problem occurs, compared to >>>> generate_data1. >>>> 2. When the data size is more than 50, more likely the above problem >>>> occurs. >>>> 3. Same codes and same data, different outputs would be got after >>>> running multiple times. Sometimes the output is as expected, sometimes are >>>> not. >>>> >>>> Does anybody know why? Or my understanding of keyed-state is wrong? >>>> >>>> Thanks. >>>> >>>>