Thanks. In addition, I run the program in a local mini cluster mode, not sure if it would affect the results.
Xingbo Huang <hxbks...@gmail.com> 于2021年7月12日周一 下午9:02写道: > Hi, > > I think your understanding is correct. The results seem a little wired. > I'm looking into this and will let you know when there are any findings. > > Best, > Xingbo > > 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道: > >> Hi all, >> I'm using pyflink to develop a module, whose main functionality is >> processing user data based on specific rules. The program involves two >> datastreams: data and rule. They have different types, so I connect them >> and use a field 'product_id' as the key for key_by method. The code is as >> follows (just demo codes, not the actual one): >> >> import random >> >> from pyflink.common.typeinfo import Types >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.datastream.functions import KeyedCoProcessFunction >> from pyflink.datastream.state import MapStateDescriptor >> from pyflink.datastream import RuntimeContext >> >> >> def test(data): >> product_ids = set() >> for key, value in data.items(): >> product_ids.add(value[0]) >> return list(product_ids) >> >> >> class MyFunction(KeyedCoProcessFunction): >> def open(self, ctx): >> data_desc = MapStateDescriptor('data', Types.STRING(), >> Types.ROW([Types.INT()])) >> self.data = ctx.get_map_state(data_desc) >> >> rule_desc = MapStateDescriptor('rule', Types.STRING(), >> Types.ROW([Types.INT()])) >> self.rules = ctx.get_map_state(rule_desc) >> >> def process_element1(self, data_value, ctx): >> row_id, others = data_value[0], data_value[1:] >> self.data.put(row_id, others) >> result = [] >> for key, value_list in self.rules.items(): >> product_id, random_0, random_1 = value_list >> # Do some calculations >> product_ids_of_state_data = test(self.data) >> result.append([random_0, random_1, product_id, >> product_ids_of_state_data]) >> return result >> >> def process_element2(self, rule_value, ctx): >> row_id, others = rule_value[0], rule_value[1:] >> self.rules.put(row_id, others) >> >> def generate_data1(count): >> collection = [] >> for i in range(count): >> collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2]) >> return collection >> >> def generate_data2(count): >> collection = [] >> for i in range(count): >> collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, >> i * 2]) >> return collection >> >> >> def main(): >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> >> data = env.from_collection(generate_data1(50)) >> rules = env.from_collection([ >> ['row_0', 1, 'rule1_value0', 'rule1_value1'], >> ['row_1', 2, 'rule2_value0', 'rule2_value1'] >> ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), >> Types.STRING()])) >> results = data.connect(rules).key_by(lambda x: x[1], lambda y: >> y[1]).process(MyFunction()) >> results.print() >> >> env.execute("test_job") >> >> if __name__ == "__main__": >> main() >> >> >> When processing the first datastream, which contains user data, I will >> access the registered MapState and get the unique product_id in it. >> According to the description on the official site: >> >> Keyed state is maintained in what can be thought of as an embedded >>> key/value store. The state is partitioned and distributed strictly together >>> with the streams that are read by the stateful operators. Hence, access to >>> the key/value state is only possible on *keyed streams*, i.e. after a >>> keyed/partitioned data exchange, and *is restricted to the values >>> associated with the current event’s key* >> >> >> I assume that each time after accessing the MapState, only one product_id >> value should be got. But I get the following outputs after running the >> script: >> >> ['rule1_value0', 'rule1_value1', 1, [1]] >>> ['rule1_value0', 'rule1_value1', 1, [1]] >>> ['rule1_value0', 'rule1_value1', 1, [1]] >>> ['rule2_value0', 'rule2_value1', 2, [1, 2]] >>> ['rule1_value0', 'rule1_value1', 1, [1, 2]] >>> ['rule1_value0', 'rule1_value1', 1, [1, 2]] >>> ['rule2_value0', 'rule2_value1', 2, [1, 2]] >>> >> >> It shows that each user data does be processed based on the according >> rules, but there has other product's data (the last element of each row) in >> the MapState. And there are other strange points: >> 1. I have two functions to generate data. When using data generated by >> generate_data2, more likely the above problem occurs, compared to >> generate_data1. >> 2. When the data size is more than 50, more likely the above problem >> occurs. >> 3. Same codes and same data, different outputs would be got after running >> multiple times. Sometimes the output is as expected, sometimes are not. >> >> Does anybody know why? Or my understanding of keyed-state is wrong? >> >> Thanks. >> >>