Hi all, I'm using pyflink to develop a module, whose main functionality is processing user data based on specific rules. The program involves two datastreams: data and rule. They have different types, so I connect them and use a field 'product_id' as the key for key_by method. The code is as follows (just demo codes, not the actual one):
import random from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import KeyedCoProcessFunction from pyflink.datastream.state import MapStateDescriptor from pyflink.datastream import RuntimeContext def test(data): product_ids = set() for key, value in data.items(): product_ids.add(value[0]) return list(product_ids) class MyFunction(KeyedCoProcessFunction): def open(self, ctx): data_desc = MapStateDescriptor('data', Types.STRING(), Types.ROW([Types.INT()])) self.data = ctx.get_map_state(data_desc) rule_desc = MapStateDescriptor('rule', Types.STRING(), Types.ROW([Types.INT()])) self.rules = ctx.get_map_state(rule_desc) def process_element1(self, data_value, ctx): row_id, others = data_value[0], data_value[1:] self.data.put(row_id, others) result = [] for key, value_list in self.rules.items(): product_id, random_0, random_1 = value_list # Do some calculations product_ids_of_state_data = test(self.data) result.append([random_0, random_1, product_id, product_ids_of_state_data]) return result def process_element2(self, rule_value, ctx): row_id, others = rule_value[0], rule_value[1:] self.rules.put(row_id, others) def generate_data1(count): collection = [] for i in range(count): collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2]) return collection def generate_data2(count): collection = [] for i in range(count): collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, i * 2]) return collection def main(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) data = env.from_collection(generate_data1(50)) rules = env.from_collection([ ['row_0', 1, 'rule1_value0', 'rule1_value1'], ['row_1', 2, 'rule2_value0', 'rule2_value1'] ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), Types.STRING()])) results = data.connect(rules).key_by(lambda x: x[1], lambda y: y[1]).process(MyFunction()) results.print() env.execute("test_job") if __name__ == "__main__": main() When processing the first datastream, which contains user data, I will access the registered MapState and get the unique product_id in it. According to the description on the official site: Keyed state is maintained in what can be thought of as an embedded > key/value store. The state is partitioned and distributed strictly together > with the streams that are read by the stateful operators. Hence, access to > the key/value state is only possible on *keyed streams*, i.e. after a > keyed/partitioned data exchange, and *is restricted to the values > associated with the current event’s key* I assume that each time after accessing the MapState, only one product_id value should be got. But I get the following outputs after running the script: ['rule1_value0', 'rule1_value1', 1, [1]] > ['rule1_value0', 'rule1_value1', 1, [1]] > ['rule1_value0', 'rule1_value1', 1, [1]] > ['rule2_value0', 'rule2_value1', 2, [1, 2]] > ['rule1_value0', 'rule1_value1', 1, [1, 2]] > ['rule1_value0', 'rule1_value1', 1, [1, 2]] > ['rule2_value0', 'rule2_value1', 2, [1, 2]] > It shows that each user data does be processed based on the according rules, but there has other product's data (the last element of each row) in the MapState. And there are other strange points: 1. I have two functions to generate data. When using data generated by generate_data2, more likely the above problem occurs, compared to generate_data1. 2. When the data size is more than 50, more likely the above problem occurs. 3. Same codes and same data, different outputs would be got after running multiple times. Sometimes the output is as expected, sometimes are not. Does anybody know why? Or my understanding of keyed-state is wrong? Thanks.