Hi again, It seems the temporary solution I mentioned in last mail doesn't work steadily. I am wondering that whether all states should be accessed in `process_element1`, `process_element2` and `on_timer` (if a timer is registered) or not ?
Or is there any other suggested workaround? Thanks. Fei Zhao <emanonc...@gmail.com> 于2021年7月13日周二 下午5:21写道: > Hi, > > Thanks for your explanation! Adding a line `self.data.contains('xxx')` in > the `process_element2` and all goes well. I will take this as my temporary > solution. > > Looking forward to the next release. > > Best Regards, > Fei > > > Xingbo Huang <hxbks...@gmail.com> 于2021年7月13日周二 下午4:18写道: > >> Hi, >> >> I have created the JIRA[1] to fix this bug which will be included in >> release-1.13.2. The root cause is the wrong mapping of the state key to the >> state. This kind of wrong mapping occurs when the key is switched, but the >> state is not used. As you wrote in the example, the `data` you declared is >> not used in `process_element2` >> >> [1] https://issues.apache.org/jira/browse/FLINK-23368 >> >> Best, >> Xingbo >> >> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午10:00写道: >> >>> Thanks. In addition, I run the program in a local mini cluster mode, not >>> sure if it would affect the results. >>> >>> Xingbo Huang <hxbks...@gmail.com> 于2021年7月12日周一 下午9:02写道: >>> >>>> Hi, >>>> >>>> I think your understanding is correct. The results seem a little wired. >>>> I'm looking into this and will let you know when there are any findings. >>>> >>>> Best, >>>> Xingbo >>>> >>>> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道: >>>> >>>>> Hi all, >>>>> I'm using pyflink to develop a module, whose main functionality is >>>>> processing user data based on specific rules. The program involves two >>>>> datastreams: data and rule. They have different types, so I connect them >>>>> and use a field 'product_id' as the key for key_by method. The code is as >>>>> follows (just demo codes, not the actual one): >>>>> >>>>> import random >>>>> >>>>> from pyflink.common.typeinfo import Types >>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>> from pyflink.datastream.functions import KeyedCoProcessFunction >>>>> from pyflink.datastream.state import MapStateDescriptor >>>>> from pyflink.datastream import RuntimeContext >>>>> >>>>> >>>>> def test(data): >>>>> product_ids = set() >>>>> for key, value in data.items(): >>>>> product_ids.add(value[0]) >>>>> return list(product_ids) >>>>> >>>>> >>>>> class MyFunction(KeyedCoProcessFunction): >>>>> def open(self, ctx): >>>>> data_desc = MapStateDescriptor('data', Types.STRING(), >>>>> Types.ROW([Types.INT()])) >>>>> self.data = ctx.get_map_state(data_desc) >>>>> >>>>> rule_desc = MapStateDescriptor('rule', Types.STRING(), >>>>> Types.ROW([Types.INT()])) >>>>> self.rules = ctx.get_map_state(rule_desc) >>>>> >>>>> def process_element1(self, data_value, ctx): >>>>> row_id, others = data_value[0], data_value[1:] >>>>> self.data.put(row_id, others) >>>>> result = [] >>>>> for key, value_list in self.rules.items(): >>>>> product_id, random_0, random_1 = value_list >>>>> # Do some calculations >>>>> product_ids_of_state_data = test(self.data) >>>>> result.append([random_0, random_1, product_id, >>>>> product_ids_of_state_data]) >>>>> return result >>>>> >>>>> def process_element2(self, rule_value, ctx): >>>>> row_id, others = rule_value[0], rule_value[1:] >>>>> self.rules.put(row_id, others) >>>>> >>>>> def generate_data1(count): >>>>> collection = [] >>>>> for i in range(count): >>>>> collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2]) >>>>> return collection >>>>> >>>>> def generate_data2(count): >>>>> collection = [] >>>>> for i in range(count): >>>>> collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % >>>>> i, i * 2]) >>>>> return collection >>>>> >>>>> >>>>> def main(): >>>>> env = StreamExecutionEnvironment.get_execution_environment() >>>>> env.set_parallelism(1) >>>>> >>>>> data = env.from_collection(generate_data1(50)) >>>>> rules = env.from_collection([ >>>>> ['row_0', 1, 'rule1_value0', 'rule1_value1'], >>>>> ['row_1', 2, 'rule2_value0', 'rule2_value1'] >>>>> ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), >>>>> Types.STRING()])) >>>>> results = data.connect(rules).key_by(lambda x: x[1], lambda y: >>>>> y[1]).process(MyFunction()) >>>>> results.print() >>>>> >>>>> env.execute("test_job") >>>>> >>>>> if __name__ == "__main__": >>>>> main() >>>>> >>>>> >>>>> When processing the first datastream, which contains user data, I will >>>>> access the registered MapState and get the unique product_id in it. >>>>> According to the description on the official site: >>>>> >>>>> Keyed state is maintained in what can be thought of as an embedded >>>>>> key/value store. The state is partitioned and distributed strictly >>>>>> together >>>>>> with the streams that are read by the stateful operators. Hence, access >>>>>> to >>>>>> the key/value state is only possible on *keyed streams*, i.e. after >>>>>> a keyed/partitioned data exchange, and *is restricted to the values >>>>>> associated with the current event’s key* >>>>> >>>>> >>>>> I assume that each time after accessing the MapState, only one >>>>> product_id value should be got. But I get the following outputs after >>>>> running the script: >>>>> >>>>> ['rule1_value0', 'rule1_value1', 1, [1]] >>>>>> ['rule1_value0', 'rule1_value1', 1, [1]] >>>>>> ['rule1_value0', 'rule1_value1', 1, [1]] >>>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]] >>>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]] >>>>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]] >>>>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]] >>>>>> >>>>> >>>>> It shows that each user data does be processed based on the according >>>>> rules, but there has other product's data (the last element of each row) >>>>> in >>>>> the MapState. And there are other strange points: >>>>> 1. I have two functions to generate data. When using data generated by >>>>> generate_data2, more likely the above problem occurs, compared to >>>>> generate_data1. >>>>> 2. When the data size is more than 50, more likely the above problem >>>>> occurs. >>>>> 3. Same codes and same data, different outputs would be got after >>>>> running multiple times. Sometimes the output is as expected, sometimes are >>>>> not. >>>>> >>>>> Does anybody know why? Or my understanding of keyed-state is wrong? >>>>> >>>>> Thanks. >>>>> >>>>>